Active-Active PostgreSQL Federation on Kubernetes
What if I told you that you can create an out-of-the-box active-active, federated PostgreSQL cluster on Kubernetes?
Since logical decoding was introduced in PostgreSQL 9.4, I have been fascinated by the various applications it has. In fact, I've used this feature to apply the concepts of change data capture both in theory and practice to the benefit of both applications and users. Logical replication and native partitioning support, introduced in Postgres 10, offered even more possibilities on distributing application load, in particular for geographically distributed workloads.
While I've seen the increased need for turnkey high availability for applications (and thanks to the improvements in cloud technologies, something I've embraced), I've been seeing a less common use case crop up more where an application may require the availability of multiple write nodes. I've seen the cases range from geographically distributed applications that have large amounts of writes within their region to embedded applications that may be disrupted from the Internet for some time and need to sync up later.
Additionally, while these use cases may want to distribute writes, they want to ensure that a single node can view all of the data. This is a classic example of data federation, and a concept that PostgreSQL has supported for a long time via foreign data wrappers. However, we can make a federated PostgreSQL cluster much more efficient at reading by using some features in modern PostgreSQL versions, such as logical replication and partitioning!
Let's walk through how we can set up a three primary node federated Postgres cluster that avoids conflicts using the Crunchy Postgres Operator and Kubernetes.
Setting up the Environment
For convenience, I am going to create the environment using the PostgreSQL Operator. Setting up the Postgres Operator is out of scope for this blog, but if you are following along at home, I suggest that you use the quickstart. By default, all PostgreSQL clusters will be deployed into the pgo namespace.
Let's create three PostgreSQL clusters, specifically three instances that are running PostgreSQL 13. I've segmented the instances into "east", "central", and "west", and on each instance, I am going to create a user named hippo with a password of datalake, as well as create a database name hippo.
pgo create cluster hippo-east \
--ccp-image-tag=centos8-13.1-4.6.0 \
--username=hippo \
--password=datalake \
--password-superuser=superdatalake \
--database=hippo
pgo create cluster hippo-central \
--ccp-image-tag=centos8-13.1-4.6.0 \
--username=hippo \
--password=datalake \
--password-superuser=superdatalake \
--database=hippo
pgo create cluster hippo-west \
--ccp-image-tag=centos8-13.1-4.6.0 \
--username=hippo \
--password=datalake \
--password-superuser=superdatalake \
--database=hippo
These clusters may take a few moments to be provisioned.
We are going to need a way to identify each node in our federated cluster. The pgnodemx extension, included with the Postgres Operator, provides a convenient way to look up the name of the node. While the Postgres Operator by default enables the shared library for pgnodemx, you will still need to add the extension to the database to use it.
We need to add this extension as the Postgres superuser, which is named postgres to the hippo database. There are several ways to connect to a PostgreSQL database running on Kubernetes, as described in the Postgres Operator documentation. Additionally, we will need to run commands on all three Postgres instances, so please pay attention to where you are executing the below!
In the example above, Recall when we provisioned our three clusters, we set a superuser password of superdatalake. First, let's connect to the hippo-east Postgres cluster and into the hippo database. If you are using the port-forward technique, your connection string may look like this:
PGPASSWORD=superdatalake psql -h localhost -U postgres hippo
Once you are logged in, create the following function that will get the node name and grant permission to the hippo user to be able to execute it:
CREATE EXTENSION IF NOT EXISTS pgnodemx;
CREATE OR REPLACE FUNCTION hippo.get_node_name()
RETURNS text
AS $$
SELECT val FROM kdapi_setof_kv('labels') WHERE key='pg-cluster';
$$ LANGUAGE SQL SECURITY DEFINER IMMUTABLE;
GRANT EXECUTE ON FUNCTION hippo.get_node_name() TO hippo;
Repeat the same steps on hippo-central and hippo-west.
Test out the function as the hippo user! For example, try logging into the hippo database as the hippo user in the hippo-east cluster following similar steps to how you logged in above. For instance using the port-forward method:
PGPASSWORD=datalake psql -h localhost -U hippo hippo
Try executing the function:
SELECT get_node_name();
get_node_name
---------------
hippo-east
(1 row)
Success! If you are not using Kubernetes, you can achieve a similar set up by setting a PostgreSQL customized option, e.g. node.node_name and referencing that in later parts of this example.
The base configuration for setting up our three node writable PostgreSQL cluster is complete. Now we need to set up our actual data structure!
Creating the Data Structure
As mentioned above, there are several ways to connect to a PostgreSQL database running on Kubernetes, as described in the Postgres Operator documentation. To set up our data structure, we will need to log in as the hippo user using the password that we set up in the earlier step (datalake). Additionally, we will need to run commands on all three Postgres instances, so please pay attention to where you are executing the below!
First, let's connect to the hippo-east Postgres cluster and into the hippo database. If you are using the port-forward technique, your connection string may look like this:
PGPASSWORD=datalake psql -h localhost -U hippo hippo
Our data structure is going to collect observed values and the time in which they were observed. Execute the following SQL commands while connected to "hippo-east":
CREATE TABLE hippos (
id uuid DEFAULT gen_random_uuid() NOT NULL,
node_name text,
value numeric,
created_at timestamptz
) PARTITION BY LIST (node_name);
CREATE TABLE hippo_default PARTITION of hippos (PRIMARY KEY (id)) DEFAULT;
CREATE TABLE hippo_east PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN ('hippo-east');
CREATE TABLE hippo_central PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN ('hippo-central');
CREATE TABLE hippo_west PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN ('hippo-west');
CREATE OR REPLACE FUNCTION add_node_name()
RETURNS trigger AS $$
BEGIN
UPDATE hippos
SET node_name = hippo.get_node_name()
WHERE node_name IS NULL;
RETURN NULL;
END
$$ LANGUAGE plpgsql;
CREATE TRIGGER add_node_name
AFTER INSERT ON hippos
FOR EACH STATEMENT
EXECUTE FUNCTION add_node_name();
Notice that the value that will be used as the primary key uses the gen_random_uuid() function that is available in PostgreSQL 13 -- older versions of PostgreSQL need to run CREATE EXTENSION pgcrypto; as a superuser to allow use of that function.
As the hippo user, connect to hippo-central and hippo-west and execute the same commands.
So, what did we just do?
First, we created a partitioned table with a partition key corresponding to our three node names: hippo-east, hippo-central, and hippo-west. These will be used to segment the writes from the different nodes. Also notice that we created a default partition: this is important for appropriately routing each inserted row into the correct partition.
PostgreSQL 13 introduced the ability to use BEFORE row triggers on partitioned tables, though they do not allow you to modify the partition key. However, this is exactly what we need to do! Instead, after a row is inserted (and routed to the default partition), we run a statement trigger that moves any row in the default partition into the partition representing the current node (which we get from get_node_name() function we created).
In other words, the above schema ensures that all inserted rows are routed to a partition representing that specific node!
Now, let's get the nodes talking to each other.
Sharing Writes Across Nodes
For the next step, we need to set up logical replication between the nodes. To do this, we need to be a Postgres superuser, in this case postgres. Recall when we provisioned our three clusters, we set a superuser password of superdatalake. To set up logical replication, we will need to perform the following steps:
- Grant the hippo user the REPLICATION privilege. Note that this is a near-superuser privilege but we are doing this for the convenience of our demo.
- Create publishers on each of the clusters to advertise their specific partition collecting writes. For example, on hippo-east this would be the hippo_east table.
- After all of the publishers are created, create subscriptions on each of the clusters to the other clusters to read in all of the changes.
This time, we will have to run distinct commands on each PostgreSQL cluster, so pay attention!
First, connect to hippo-east. An example of this connection string might look like:
PGPASSWORD=superdatalake psql -h localhost -U postgres hippo
Execute the following commands on hippo-east:
ALTER ROLE hippo REPLICATION;
CREATE PUBLICATION pub_hippo_east FOR TABLE hippo.hippo_east;
As described above, this provides the hippo PostgreSQL user with the replication privilege and defines a logical replication publication for the hippo_east table.
Now, execute the following commands on hippo-central:
ALTER ROLE hippo REPLICATION;
CREATE PUBLICATION pub_hippo_central FOR TABLE hippo.hippo_central;
and hippo-west:
ALTER ROLE hippo REPLICATION;
CREATE PUBLICATION pub_hippo_west FOR TABLE hippo.hippo_west;
Now we are ready to set up the logical replication subscriptions. Each node needs to subscribe to the other nodes in order to receive all of the changes. For example, hippo-east needs to subscribe to the publishers on hippo-central and hippo-west to receive changes. Log into hippo-east as the PostgreSQL superuser ("postgres") and execute the following commands:
CREATE SUBSCRIPTION sub_hippo_east_hippo_central
CONNECTION 'dbname=hippo host=hippo-central.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_central;
CREATE SUBSCRIPTION sub_hippo_east_hippo_west
CONNECTION 'dbname=hippo host=hippo-west.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_west;
(Note that I am deriving the host name using the Kubernetes Services that the Postgres Operator creates. Read more about Kubernetes network names).
Log into hippo-central and execute the following commands:
CREATE SUBSCRIPTION sub_hippo_central_hippo_east
CONNECTION 'dbname=hippo host=hippo-east.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_east;
CREATE SUBSCRIPTION sub_hippo_central_hippo_west
CONNECTION 'dbname=hippo host=hippo-west.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_west;
And finally, hippo-west:
CREATE SUBSCRIPTION sub_hippo_west_hippo_east
CONNECTION 'dbname=hippo host=hippo-east.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_east;
CREATE SUBSCRIPTION sub_hippo_west_hippo_central
CONNECTION 'dbname=hippo host=hippo-central.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_central;
Your three node cluster is now all set up -- let's test it out!
Testing the Cluster: Writing to All Nodes
To test out the cluster, try writing some data to all three nodes. Here is an example to get you started: modify to the settings specific to your environment:
# hippo-east
PGPASSWORD=datalake psql -h hippo-east.pgo -U hippo hippo -c 'INSERT INTO hippos (value, created_at) VALUES (random(), CURRENT_TIMESTAMP);'
# hippo-central
PGPASSWORD=datalake psql -h hippo-central.pgo -U hippo hippo -c 'INSERT INTO hippos (value, created_at) VALUES (random(), CURRENT_TIMESTAMP);'
# hippo-west
PGPASSWORD=datalake psql -h hippo-west.pgo -U hippo hippo -c 'INSERT INTO hippos (value, created_at) VALUES (random(), CURRENT_TIMESTAMP);'
Now, log into hippo-east as the hippo user and inspect the hippos table. This is what I saw:
hippo=> TABLE hippos;
id | node_name | value | created_at
--------------------------------------+---------------+-------------------+-------------------------------
4608b3a8-0f34-4837-8456-5944a61d15de | hippo-central | 0.484604634620151 | 2020-12-19 16:41:08.707359+00
eefa9a61-cc7e-44bc-a427-4b26c2564d24 | hippo-east | 0.270977731568895 | 2020-12-19 16:41:16.837468+00
37cf93dd-c7a1-44be-9eab-a58c73a14740 | hippo-west | 0.509173376067992 | 2020-12-19 16:40:59.949198+00
(3 rows)
Pretty cool, so everything replicated over! And to ensure that this was not all an illusion, I inspected each of the partition tables to see what rows were in them:
hippo=> TABLE hippo_east;
id | node_name | value | created_at
--------------------------------------+------------+-------------------+-------------------------------
eefa9a61-cc7e-44bc-a427-4b26c2564d24 | hippo-east | 0.270977731568895 | 2020-12-19 16:41:16.837468+00
(1 row)
hippo=> TABLE hippo_central;
id | node_name | value | created_at
--------------------------------------+---------------+-------------------+-------------------------------
4608b3a8-0f34-4837-8456-5944a61d15de | hippo-central | 0.484604634620151 | 2020-12-19 16:41:08.707359+00
(1 row)
hippo=> TABLE hippo_west;
id | node_name | value | created_at
--------------------------------------+------------+-------------------+-------------------------------
37cf93dd-c7a1-44be-9eab-a58c73a14740 | hippo-west | 0.509173376067992 | 2020-12-19 16:40:59.949198+00
(1 row)
What about the other nodes? Log into hippo-central or hippo-west -- you should see something similar to this:
hippo=> TABLE hippos;
id | node_name | value | created_at
--------------------------------------+---------------+-------------------+-------------------------------
4608b3a8-0f34-4837-8456-5944a61d15de | hippo-central | 0.484604634620151 | 2020-12-19 16:41:08.707359+00
eefa9a61-cc7e-44bc-a427-4b26c2564d24 | hippo-east | 0.270977731568895 | 2020-12-19 16:41:16.837468+00
37cf93dd-c7a1-44be-9eab-a58c73a14740 | hippo-west | 0.509173376067992 | 2020-12-19 16:40:59.949198+00
Success -- we create a three node Postgres cluster on Kubernetes where each node can safely accept writes!
Scaling Up: Adding Another Node
It looks like our application is going to need a node in the south, so how do we add another node?
This is actually not too complicated if we follow the above steps. Note that as you add more nodes, you may need to increase PostgreSQL parameters such as max_wal_senders and max_replication_slots.
First, add a new Postgres cluster:
pgo create cluster hippo-south \
--ccp-image-tag=centos8-13.1-4.6.0 \
--username=hippo \
--password=datalake \
--password-superuser=superdatalake \
--database=hippo
Similar to the beginning of the example, log in as the postgres user into the hippo database and create the pgnodemx extension and the get_node_name function. For convenience, here are those commands again:
CREATE EXTENSION IF NOT EXISTS pgnodemx;
CREATE OR REPLACE FUNCTION hippo.get_node_name()
RETURNS text
AS $$
SELECT val FROM kdapi_setof_kv('labels') WHERE key='pg-cluster';
$$ LANGUAGE SQL SECURITY DEFINER IMMUTABLE;
GRANT EXECUTE ON FUNCTION hippo.get_node_name() TO hippo;
Now log into hippo-south as the hippo user and add the schema, now with an additional partition for hippo-south:
CREATE TABLE hippos (
id uuid DEFAULT gen_random_uuid() NOT NULL,
node_name text,
value numeric,
created_at timestamptz
) PARTITION BY LIST (node_name);
CREATE TABLE hippo_default PARTITION of hippos (PRIMARY KEY (id)) DEFAULT;
CREATE TABLE hippo_east PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN ('hippo-east');
CREATE TABLE hippo_central PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN ('hippo-central');
CREATE TABLE hippo_west PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN ('hippo-west');
CREATE TABLE hippo_south PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN ('hippo-south');
CREATE OR REPLACE FUNCTION add_node_name()
RETURNS trigger AS $$
BEGIN
UPDATE hippos
SET node_name = hippo.get_node_name()
WHERE node_name IS NULL;
RETURN NULL;
END
$$ LANGUAGE plpgsql;
CREATE TRIGGER add_node_name
AFTER INSERT ON hippos
FOR EACH STATEMENT
EXECUTE FUNCTION add_node_name();
Log into hippo-south as a Postgres superuser and grant the REPLICATION privilege to hippo, create the publisher for the hippo_south partition, and create the subscribers:
ALTER ROLE hippo REPLICATION;
CREATE PUBLICATION pub_hippo_south FOR TABLE hippo.hippo_south;
CREATE SUBSCRIPTION sub_hippo_south_hippo_east
CONNECTION 'dbname=hippo host=hippo-east.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_east;
CREATE SUBSCRIPTION sub_hippo_south_hippo_central
CONNECTION 'dbname=hippo host=hippo-central.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_central;
CREATE SUBSCRIPTION sub_hippo_south_hippo_west
CONNECTION 'dbname=hippo host=hippo-west.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_west;
As the hippo user, log into hippo-east, hippo-central, and hippo-west and add the hippo_south partition:
CREATE TABLE hippo_south PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN ('hippo-south');
Now as a Postgres superuser, log into each of the nodes below and execute the following, starting with hippo-east:
CREATE SUBSCRIPTION sub_hippo_east_hippo_south
CONNECTION 'dbname=hippo host=hippo-south.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_south;
Followed by hippo-central:
CREATE SUBSCRIPTION sub_hippo_central_hippo_south
CONNECTION 'dbname=hippo host=hippo-south.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_south;
And finally hippo-west:
CREATE SUBSCRIPTION sub_hippo_west_hippo_south
CONNECTION 'dbname=hippo host=hippo-south.pgo user=hippo password=datalake'
PUBLICATION pub_hippo_south;
With that, you have added an additional node to your federated PostgreSQL cluster! Test it out and see the results.
Understanding the Solution & Next Steps
One question that comes up with systems with multiple write nodes is how to handle conflicts. In the above example, the conflict is actually handled at each individual node: the primary key is set in each partition, so if a conflicting UUID is generated, it will not be inserted and therefore, not replicated. This does not prevent two nodes from generating the same UUID: we could leverage UUIDv5 or the like to help prevent this conflict, or ensure there is some other natural key we can use as a lookup for our source of truth.
As a safety measure, we could further lock down the schema to prevent a node from inadvertently adding data to a partition that it is not allowed to write into.
If we do not intend to create additional nodes, another solution for the primary key conflicts is to coordinate sequences. For example, using our three node cluster, we could design a schema that looks similar to this:
# hippo-east
CREATE TABLE hippos (
id bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 1 INCREMENT BY 3) NOT NULL,
value numeric,
created_at timestamptz
) PARTITION BY LIST (((id - 1) % 3));
CREATE TABLE hippo_east PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (0);
CREATE TABLE hippo_central PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (1);
CREATE TABLE hippo_west PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (2);
# hippo-central
CREATE TABLE hippos (
id bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 2 INCREMENT BY 3) NOT NULL,
value numeric,
created_at timestamptz
) PARTITION BY LIST (((id - 1) % 3));
CREATE TABLE hippo_east PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (0);
CREATE TABLE hippo_central PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (1);
CREATE TABLE hippo_west PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (2);
# hippo-west
CREATE TABLE hippos (
id bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 3 INCREMENT BY 3) NOT NULL,
value numeric,
created_at timestamptz
) PARTITION BY LIST (((id - 1) % 3));
CREATE TABLE hippo_east PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (0);
CREATE TABLE hippo_central PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (1);
CREATE TABLE hippo_west PARTITION OF hippos (PRIMARY KEY (id)) FOR VALUES IN (2);
This will prevent the primary keys from conflicting and remove the need for the statement trigger, but adding additional nodes will take a nontrivial amount of effort.
How about high availability? Each node needs to have its own HA. The Postgres
Operator simplifies this as you can deploy
HA clusters by default,
and convert clusters to using HA with the pgo scale
command. The PostgreSQL Operator also has the added benefit of
built-in backups with pgBackRest,
though note that each node may have a different view of the world at the time of
backup.
While this solution presents a way to achieve write scaling with PostgreSQL, note that it is not a "set and forget" solution: you do need to monitor your nodes, in particular, for the situation when a node is unable to rejoin the cluster for a prolonged period of time. PostgreSQL 13 also adds a configuration parameter that allows for a replication slot to be dropped if it has gone unacknowledged for too long, though note that means you would have to resync the partition on the affected node.
Any system that requires multiple writable nodes has challenges. The recent advancements in PostgreSQL make it possible to build out more complex architectures that solve complex data access and distribution problems while providing the robust developer functionality that drew me into Postgres into the first place!