Postgres is a robust data platform. Yes, it's more than a boring old relational database. It has rich indexing, data types (including JSON), and so much more. It also has support for a variety of extensions that can further broaden it's already great functionality. Two of those extensions when coupled together make Postgres a very compelling approach for IoT architectures. Today we're going to start from the ground up on how you would design your architecture with Postgres along with the Citus and pg_partman extensions.
Citus is an extension that allows you to shard your database across multiple nodes, while allowing your application to remain largely unaware. Citus can be beneficial to your app if:
- You expect to outgrow the performance a single Postgres instance can deliver
- Your schema/data model can be mapped cleanly to Citus
- The queries/workload pattern can be mapped cleanly to Citus
Lucky for us, IOT workloads check the boxes for all of the above.
We're going to begin with a simple schema that relates to vehicles and tracks a few basic measurements against them. We'll also have a table that tracks the location of the vehicle at the time of each sensor sampling as well.
CREATE TABLE sensor_data ( id SERIAL, car_id VARCHAR(17) NOT NULL, sensor_type VARCHAR(20) NOT NULL, sensor_value INT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL ); CREATE TABLE location_data ( id SERIAL, car_id VARCHAR(17) NOT NULL, latitude float8, longitude float8, timestamp TIMESTAMP WITH TIME ZONE NOT NULL );
While our above schema is simply it's not unrealistic of many IoT data models, though yours could be more complex.
The key to sharding is that you can push down most of your joins to the node
where the data is located. If you're having to move data in between nodes in
order to join your performance will suffer. In the case of IoT workloads
device_id is a very common pattern for the sharding key.
To turn this into a sharded database with Citus installed we simply need to run:
SELECT create_distributed_table('sensor_data', 'car_id'); SELECT create_distributed_table('location_data', 'car_id');
By default Citus will co-locate device IDs together because they're sharded on
the same value and have the same number of shards. Citus uses a default of
creating 32 shards, but it's configurable if you need more or less. It's worth
noting that shards are separate from the number of nodes/instances. In this case
if you had a Citus configuration of 1 coordinator and 2 workers each worker
would reach 16 shards of
sensor_data and 16 of
Now once sharded you we can see how Citus will work. Lets run two different queries and see how Citus works with each:
SELECT sensor_data.car_id, max(sensor_value) FROM sensor_data WHERE sensor_type = 'temperature' GROUP BY 1 ORDER BY 2 DESC;
In the above case Citus will actually parallelize the query and in total run 32 queries, one against each shard, bring back the results to the coordinator and compute the final result. This means for each individual query you run you'd have 32 total connections going out from your coordinator and executing queries. This is great for parallelism, but a big trade-off is the concurrency you get in the number of queries you can run.
However, if we modify the query to include the
car_id Citus will execute a
single query on the worker node where the data lives.
SELECT sensor_data.car_id, max(sensor_value) FROM sensor_data WHERE sensor_type = 'temperature' AND car_id='433P2C2S7TJ654181';
Even if we were to expand the query to return the location data as well, because the data is co-located Citus knows it can push down the join to that single node.
SELECT sensor_data.car_id, max(sensor_value), location_data.latitude, location_data.longitude FROM sensor_data, location_data WHERE sensor_type = 'temperature' AND sensor_data.car_id='433P2C2S7TJ654181'; AND location_data.car_id='433P2C2S7TJ654181'; AND sensor_data.car_id = location_data.car_id AND sensor_data.timestamp = location_data.timestamp
Again, if you anticipate a large data volume and issues scaling performance, your data model can be structured to be cleanly sharded, and your query workload fits well into Citus. Citus gives you a lot of peace of mind to scale out. But where does time series come in?
Postgres itself already has
built-in, but we often recommend coupling that with
pg_partman which extends
the native partitioning with some helper utilities to make it easier to work
with. Partitioning is the process of separating data out by particular buckets
into separate tables. In an IOT scenario you may want to retain data on all of
your vehicles for the past year, but in most cases are only querying the data
for the last week. In that case you could easily partition your data by week,
this would allow more easily for the smaller data set of the last week or two to
be kept in memory because it is smaller and corresponding indexes are also
smaller and easier to maintain.
In order to set up
pg_partman with Citus we're actually going to start fresh
and create our tables as partitioned tables. Here we can see the end to end
setup similar to earlier with Citus, but this time with partitioned tables:
CREATE TABLE sensor_data ( id SERIAL, car_id VARCHAR(17) NOT NULL, sensor_type VARCHAR(20) NOT NULL, sensor_value INT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL ) PARTITION BY RANGE (timestamp); CREATE TABLE location_data ( id SERIAL, car_id VARCHAR(17) NOT NULL, latitude float8, longitude float8, timestamp TIMESTAMP WITH TIME ZONE NOT NULL ) PARTITION BY RANGE (timestamp); SELECT create_distributed_table('sensor_data', 'car_id'); SELECT create_distributed_table('location_data', 'car_id');
Now if we look at our database, it still contains just a few tables
\d List of relations Schema | Name | Type | Owner --------+---------------------------+-------------------+------------------- public | location_data | partitioned table | application public | location_data_id_seq | sequence | application public | pg_stat_statements | view | crunchy_superuser public | pg_stat_statements_info | view | crunchy_superuser public | sensor_data | partitioned table | application public | sensor_data_id_seq | sequence | application (6 rows)
location_data as partitioned tables, we
haven't done anything to setup the initial partitions. Here we're going to
pg_partman to create the partitions. We're going to have it create
monthly partitions, you could have this been weekly, daily or some other
granularity. We're going to have it create partitions starting at 1 month ago:
SELECT partman.create_parent('public.sensor_data', 'thetime', 'native', 'monthly', p_start_partition := (now() - interval '1 month')::date::text ); SELECT partman.create_parent('public.location_data', 'thetime', 'native', 'monthly', p_start_partition := (now() - interval '1 month')::date::text ); -- Configure partman to continue creating partitions UPDATE partman.part_config SET infinite_time_partitions = true; -- Configure partman to regularly run to create new partitions SELECT cron.schedule('@hourly', $$SELECT partman.run_maintenance()$$);
So now we're running partitioned data inside each of our Citus shards.
The above approach for partitioning and sharding works great when it comes to building your application and keeping it performant. Enter the cost management side of the equation. Retaining all data for all of time is valuable if the cost is free, but saving all of your data so it’s easily queryable isn’t actually going to be free. Enter Citus columnar support. Citus columnar support comes with a few caveats:
- No support for updates or deletes
- No support for logical replication or decoding
Fortunately for us, our IoT use case still can fully take advantage of the columnar format and provide:
- Great storage compression
- Faster querying when scanning lots of sequential data
Let's look at turning a table into a columnar one:
SELECT alter_table_set_access_method('sensor_data_2023_oct', 'columnar');
This will change the partition for sensor_data in October into a columnar format.
We can now run a
VACUUM VERBOSE sensor_data on the table and see that we have
a 10.20x compression rate!
VACUUM VERBOSE sensor_data; INFO: statistics for "sensor_data": storage id: 10000000068 total file size: 64897024, total data size: 64252933 compression rate: 10.20x total row count: 11999999, stripe count: 80, average rows per stripe: 149999 chunk count: 6000, containing data for dropped columns: 0, zstd compressed: 6000
Because our IoT data generally comes in within a set period of time, and is
immutable after a specific date, we can then go and compress partitions after a
certain period of time. In this case we’re going to convert all partitions older
than 3 months. Bear with us because the
pg_cron incantation for it is a bit
gnarly, but gets the job done:
DO $accessmethod$ DECLARE v_row_partitions record; v_row_info record; v_sql text; BEGIN FOR v_row_partitions IN SELECT partition_schemaname||'.'||partition_tablename AS partition_name FROM partman.show_partitions('partman_test.time_taptest_table') LOOP FOR v_row_info IN SELECT child_start_time, child_end_time FROM partman.show_partition_info(v_row_partitions.partition_name) LOOP IF v_row_info.child_end_time < CURRENT_TIMESTAMP - '3 months'::interval THEN v_sql := format('SELECT alter_table_set_access_method(%L, columnar)', v_row_partitions.partition_name); EXECUTE '%', v_sql; END IF; END LOOP; END LOOP; END $accessmethod$;
And there we have it, a horizontally scalable database for an IOT workload driven by:
- Citus based sharding for seamless scaling and performance
- pg_partman for native time-series partitioning, giving us faster query recall and reporting
- Columnar compression to help us better manage storage and longer term retention
November 17, 2023 •More by this author