PostgreSQL Change Data Capture With Debezium

Dave Cramer

10 min read

As you can see from my previous blogs (A Guide to Building an Active-Active PostgreSQL Cluster) I’m interested in the ways that we can replicate data in PostgreSQL. For this post, I've decided to write about a product that enabled replicating heterogeneous databases.

Through my involvement in the PostgreSQL JDBC project, I’ve had the opportunity to help out the folks in the Debezium project. Debezium is more than just another heterogeneous replication solution.

Debezium is built upon the Apache Kafka project and uses Kafka to transport the changes from one system to another. So let’s look at how this works.

The most interesting aspect of Debezium is that at the core it is using Change Data Capture (CDC) to capture the data and push it into Kafka. The advantage of this is that the source database remains untouched in the sense that we don’t have to add triggers or log tables. This is a huge advantage as triggers and log tables degrade performance.

Historically data was kept in one huge monolithic datastore and all of the services read or wrote to this datastore. Newer systems are trending towards microservices where the processing of data is broken up into smaller discrete tasks. The challenge at that point is making sure that each microservice has an up to date copy of the data. CDC shines at this as it:

  • Uses the write ahead logs to track the changes
  • Uses the datastore to manage the changes (don’t lose data if offline)
  • Pushes changes immediately

This makes the system much more flexible. If you want to add a new microservice, simply subscribe to the topic in Kafka that is pertinent to the service.

To get a picture of what a system might look like we have some source databases on the left, source connectors, Kafka in the middle and a JDBC sink on the right pushing data to an analytics database:

graphic showing DBZ PG and a JDBC connector This is just one example of how one could use Debezium, in this case we are using it to aggregate data from our production databases to an analytics database. The output could just as easily be going to Solr, or ElasticSearch.

So let’s dive in and see how to actually make this work.

Setting up Debezium with PostgreSQL

We are using Debezium version 0.10, which was recently released, but everything in this example should work the same with 0.9.

We will be using docker images to make this simpler to get started. The docker images are provided by the Debezium project https://debezium.io/ and https://github.com/debezium/docker-images

First we need to start zookeeper which is a distributed configuration store. Kafka uses this to keep information about which Kafka node is the controller, it also stores the configuration for topics. This is where the status of what data has been read is stored so that if we stop and start we don’t lose any data.

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.10

There are a couple of things here that should be explained. --it means run the container interactively this will attach to the console and redirect the input and output to the screen. --rm will remove the container when it is stopped. This is useful for demo’s but is ill advised for production. We also label it zookeeper so that it can be referred to by name by other containers. The -p options expose the ports that zookeeper uses to communicate with Kafka and other zookeeper instances. Finally we are using the container debezium/zookeeper version 0.10.

Next start Kafka. Open another console and

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.10

Similar to zookeeper it is run interactively using --it and will remove itself when it finishes --rm. The name of the running container is kafka and we expose port 9092. The next option is we tell it that using --link that the zookeeper instance can be found in a container labeled zookeeper.

Before we start the source and sink connectors, we need a database to store data. This is the database that will create change events whenever we make a data modification operation. Additionally for simplicity we are going to send the change events to a database in the same container.

For the PostgreSQL container we will be using the docker containers from the Crunchy Data container https://github.com/CrunchyData/crunchy-containers.

There are a number of configuration parameters that need to be modified enable logical replication. The Crunchy Data docker container has the facility to easily modify the PostgreSQL configuration file. Here we create a configuration file we can easily load when we start the container.

#we will need to customize the postgresql.conf file to ensure wal_level=logical
cat << EOF > pgconf/postgresql.conf
# here are some sane defaults given we will be unable to use the container
# variables
# general connection
listen_addresses = '*'
port = 5432
max_connections = 20
# memory
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
# WAL / replication
wal_level = logical
max_wal_senders = 3
# these shared libraries are available in the Crunchy PostgreSQL container
shared_preload_libraries = 'pgaudit.so,pg_stat_statements.so'
EOF

Additionally we need a number of environment variables to pass into the container. Creating a file makes it easy to change.

# setup the environment file to build the container.
# we don't really need the PG_USER as we will use the postgres user for replication
# some of these are not needed based on the custom configuration
cat << EOF > pg-env.list PG_MODE=primary PG_PRIMARY_PORT=5432
PG_PRIMARY_USER=postgres PG_DATABASE=testdb PG_PRIMARY_PASSWORD=debezium
PG_PASSWORD=not PG_ROOT_PASSWORD=matter PG_USER=debezium EOF

The command to run the container uses the pg-env.list created above for environment variables and adds the postgresql.conf file created above by mounting the local pgconf dir as a volume in the container.

docker run -it --rm --name=pgsql --env-file=pg-env.list --volume=`pwd`/pgconf:/pgconf -d crunchydata/crunchy-postgres:centos7-11.4-2.4.1

Then we can use psql to create some tables.

As we are running Postgres in a docker we need to get the IP address to connect to. There are a few ways to do this:

  1. Ask docker for the IP address of the container using docker inspect pgsql | grep IPAddress (note this works because we named the container pgsql). The result is:

    $ docker inspect pgsql | grep IPAddress
    "SecondaryIPAddresses": null,
    "IPAddress": "172.17.0.2",
    "IPAddress": "172.17.0.2"
    
  2. Attach a console to the pgsql container using the label to reference the running container.

    docker exec -it pgsql /bin/bash
    su postgres
    psql postgres
    
  3. Run a new container that has psql in it using:

    docker run -it --rm --link pgsql:pg11  crunchydata/crunchy-postgres:centos7-11.4-2.4.1 psql -h pg11 -U postgres
    

Some interesting things here. The --link pgsql:pg11 allows the use of pg11 as the host address. You will also have to supply the PG_PRIMARY_PASSWORD we specified (debezium) in the environment variable file when we started the container.

So however you get into the postgres cluster you can use the following to create a table.

CREATE TABLE customers (id int GENERATED ALWAYS AS IDENTITY PRIMARY KEY, name text);
ALTER TABLE customers REPLICA IDENTITY USING INDEX customers_pkey;
INSERT INTO customers (name) VALUES ('joe'), ('bob'), ('sue');

This is a very simple table but for demonstration purposes is all we need, however there is a nuance that requires explaining. In order to minimize the amount of data that is stored on the server for logical replication and transferred through Debezium we specify the REPLICA IDENTITY

I also create another database to send the changes to.

CREATE DATABASE customers;

Note I did not create any tables here.

Now we can bring up the connector image, however we will have to make sure the jdbc-sink jar is in the connector image. A simple way to do this is to use the Debezium end to end JDBC example found here https://github.com/debezium/debezium-examples/tree/master/end-to-end-demo/debezium-jdbc

From this directory run docker build . The output from this will be successfully built 62b583dce71b where the hash code at the end will be unique to your environment

Once the container is built you can run it using the following. Note the use of the hash. We could have tagged it using

docker tag 62b583dce71b jdbc-sink

Then use the name in the following command

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link pgsql:pgsql 62b583dce71b

Kafka connect has a REST endpoint which we can use to find out things like what connectors are enabled in the container.

curl -H "Accept:application/json" localhost:8083/connectors/

As we haven’t created any yet the result is []

So let’s create a source connector.

We require a bit of JSON to send to the REST API to configure the source connector

{
	"name": "inventory-connector",
	"config": {
		"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
		"database.hostname": "pgsql",
		"plugin.name": "pgoutput",
		"database.port": "5432",
		"database.user": "postgres",
		"database.password": "debezium",
		"database.dbname": "postgres",
		"database.server.name": "fullfillment",
		"table.whitelist": "public.customers"
	}
}

This provides a name for the connector, how to connect to the database and which table to read.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @postgresql-connect.json

Starts a connector which will read the customer table out of the postgres database.

Similarly we create a JSON file to configure the sink connector

{
	"name": "jdbc-sink",
	"config": {
		"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
		"tasks.max": "1",
		"topics": "fullfillment.public.customers",
		"dialect.name": "PostgreSqlDatabaseDialect",
		"table.name.format": "customers",
		"connection.url": "jdbc:postgresql://pgsql:5432/customers?user=postgres&password=debezium",
		"transforms": "unwrap",
		"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
		"transforms.unwrap.drop.tombstones": "false",
		"auto.create": "true",
		"insert.mode": "upsert",
		"pk.fields": "id",
		"pk.mode": "record_key",
		"delete.enabled": "true"
	}
}

Note: Kafka JDBC sink defaults to creating the destination table with the same name as the topic which in this case is fullfillment.public.customers I’m not sure of other databases but in PostgreSQL this creates a table which needs to be double quoted to use. I tend to avoid these so I added the "table.name.format": "customers" to force it to create a table named customers.

And similarly we enable the connector with:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json

And using psql again we see:

docker run -it --rm --link pgsql:pg11 debezium/postgres:11 psql -h pg11 -U postgres
table customers;
 id | name
----+------
  1 | joe
  2 | bob
  3 | sue

(3 rows)

\c customers
You are now connected to database "customers" as user "postgres".

table customers;
 name | id
------+----
  joe | 1
  bob | 2
  sue | 3

(3 rows)

So the first command shows us the customers table on the PostgreSQL database, and the second the customers table on the customers database.

The most interesting is that Debezium created the table and synced it. Any changes we make now will be propagated to the customers table on the second database.

Additionally if we update data on the customers table in the postgres database we will see the

update customers set name='paul' where id=1;
UPDATE 1
\c customers
You are now connected to database "customers" as user "postgres".
customers=# table customers;
name | id
-----+----
bob  | 2
sue  | 3
paul | 1

(3 rows)

Deletes work similarly.

delete from customers where id=1;
DELETE 1
\c customers
You are now connected to database "customers" as user "postgres".
customers=# table customers;
name | id
-----+----
bob  | 2
sue  | 3

(2 rows)

One thing to note is that since logical replication in PostgreSQL does not provide any information about sequences we do not replicate sequences. They will need to be handled by some other process.

All in all this is pretty easy to setup and fairly impressive.

I'd like to thank the Debezium team for this impressive framework!

Avatar for Dave Cramer

Written by

Dave Cramer

October 1, 2019 More by this author