This document closely follows Debezium’s documentation. It explains how to use Debezium Server’s application.properties configuration file to configure Debezium Server. You will find similar explanations inside the application.properties file generated by the scaffold command:

redis-di scaffold --db-type cassandra|mysql|oracle|postgresql|sqlserver --dir <PATH_TO_DIR>

Target (sink) connector

The target connector is Redis.

Basic configuration

debezium.sink.type=redis
debezium.sink.redis.address=<REDIS_DI_BDB_HOST>:12001
debezium.sink.redis.password=<REDIS_DI_PASSWORD>

Preventing data loss

To prevent data loss the Debezium Redis Sink Connector needs to be configured to wait for write acknowledgment from the RDI database replica shard. Use the following property to achieve this:

debezium.sink.redis.wait.retry.enabled=true

Additionally, you can configure the wait timeout for replica shard acknowledgment and the delay between write retries (both 1000 milliseconds by default). See the reference section of this document for a full reference.

Redis data integration configuration reference

Property Default Description
debezium.sink.type Must be set to redis.
debezium.sink.redis.address An address, formatted as host:port, at which the Redis target streams are provided.
database.user Username to use when connecting to the database server.
debezium.sink.redis.password (Optional) A user password used to communicate with Redis. A password must be set if a user is set.
debezium.sink.redis.ssl.enabled false (Optional) Use SSL to communicate with Redis.
debezium.sink.redis.null.key Redis does not support the notion of data without keys, so this string will be used as the key name for records without primary key.
debezium.sink.redis.null.value Redis does not support the notion of null payloads, as is the case with tombstone events. This string will be used as the value for records without a payload.
debezium.sink.redis.batch.size 500 The number of change records to insert into a single batch write (pipelined transaction).
debezium.sink.redis.retry.initial.delay.ms 300 The initial retry delay when encountering Redis connection or OOM issues. This value will be doubled upon every retry but won’t exceed debezium.sink.redis.retry.max.delay.ms.
debezium.sink.redis.retry.max.delay.ms 10000 The max delay when encountering Redis connection or OOM issues.
debezium.sink.redis.memory.limit.mb 80 Debezium stops sending events when Redis size exceeds this threshold.
debezium.sink.redis.wait.enabled false If Redis is configured with a replica shard, this setting allows Debezium to verify that the data has been written to the replica.
debezium.sink.redis.wait.timeout.ms 1000 Defines the timeout in milliseconds when waiting for replica writes.
debezium.sink.redis.wait.retry.enabled false Enables retry on wait for replica failure.
debezium.sink.redis.wait.retry.delay.ms 1000 Defines the delay of retry on wait for replica failure.

Notes

  • When using Redis to store schema history and offsets, the values of the properties debezium.source.offset.storage.redis.* and debezium.source.schema.history.internal.redis.* will be inherited from the corresponding debezium.sink.redis.* properties.
  • If you want to override any of these inherited defaults, add them explicitly as debezium.source.offset.storage.redis.* and/or debezium.source.schema.history.internal.redis.* properties.

Source (database) connector

The source connector depends on the type of database used as the source. The basic configurations are the same for all database types except for the connector class.

Essential source properties

Note: Add debezium.source. prefix to the listed properties when used in application.properties.

Property Default Source Databases Description
connector.class choose from the following: io.debezium.connector.postgresql.PostgresConnector, io.debezium.connector.mongodb.MongoDbConnector, io.debezium.connector.mysql.MySqlConnector, io.debezium.connector.oracle.OracleConnector, io.debezium.connector.sqlserver.SqlServerConnector, or io.debezium.connector.cassandra.Cassandra4Connector
database.hostname MySQL, Oracle, PostgreSQL, SQLServer The address of the database instance.
database.port MySQL, Oracle, PostgreSQL, SQLServer The port number of the database instance.
database.user MySQL, Oracle, PostgreSQL, SQLServer Username to use when connecting to the database server.
database.password MySQL, Oracle, PostgreSQL, SQLServer Password to use when connecting to the database server.
database.dbname Oracle, PostgreSQL The name of the database from which to stream the changes.
database.names SQLServer The comma-separated list of the SQL Server database names from which to stream the changes.
database.pdb.name ORCLPDB1 Oracle The name of the Oracle Pluggable Database that the connector captures changes from. For non-CDB installations, do not specify this property.
lob.enabled false Oracle Enables capturing and serialization of large object (CLOB, NCLOB, and BLOB) column values in change events.
unavailable.value.placeholder __debezium_unavailable_value Oracle Specifies the constant that the connector provides to indicate that the original value is unchanged and not provided by the database.
database.encrypt false SQLServer If SSL is enabled for a SQL Server database, set this property to true.
database.server.id 1 MySQL A numeric ID of this database client, which must be unique across all currently-running database processes in the MySQL cluster.
schema.include.list Oracle, PostgreSQL, SQLServer An optional, comma-separated list of regular expressions that match names of schemas for which you want to capture changes. Any schema name not included in schema.include.list is excluded from having its changes captured. By default, all non-system schemas have their changes captured. If you include this property in the configuration, Do not also set the schema.exclude.list property.
schema.exclude.list Oracle, PostgreSQL, SQLServer An optional, comma-separated list of regular expressions that match names of schemas for which you do not want to capture changes. Any schema whose name is not included in schema.exclude.list has its changes captured, with the exception of system schemas. If you include this property in the configuration, do not also set the schema.include.list property.
database.include.list MongoDB, MySQL An optional, comma-separated list of regular expressions that match the names of the databases for which to capture changes. The connector does not capture changes in any database whose name is not in database.include.list. By default, the connector captures changes in all databases. If you include this property in the configuration, do not also set the database.exclude.list property.
database.exclude.list MongoDB, MySQL An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes. The connector captures changes in any database whose name is not in the database.exclude.list. If you include this property in the configuration, do not also set the database.include.list property.
table.include.list MySQL, Oracle, PostgreSQL, SQLServer An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables that you want Debezium to capture. Any table that is not included in table.include.list is excluded from capture. Each identifier is of the form schemaName.tableName. By default, the connector captures all non-system tables for the designated schemas. Must not be used with table.exclude.list.
table.exclude.list MySQL, Oracle, PostgreSQL, SQLServer An optional comma-separated list of regular expressions that match fully-qualified table identifiers for the tables that you want to exclude from being captured; Debezium captures all tables that are not included in table.exclude.list. Each identifier is of the form schemaName.tableName. Must not be used with table.include.list.
column.include.list empty string MySQL, Oracle, PostgreSQL, SQLServer An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in the change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Note that primary key columns are always included in the event’s key, even if not included in the value. Do not also set the column.exclude.list property.
column.exclude.list empty string MySQL, Oracle, PostgreSQL, SQLServer An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Note that primary key columns are always included in the event’s key, also if excluded from the value. Do not also set the column.include.list property.
topic.prefix Cassandra, MySQL, Oracle, PostgreSQL, SQLServer A prefix for all topic names that receive events emitted by this connector.
cassandra.node.id Cassandra The name of the Cassandra node.
cassandra.hosts localhost Cassandra One or more addresses of Cassandra nodes separated by “,”.
cassandra.port 9042 Cassandra The port used to connect to a Cassandra host(s).
cassandra.config Cassandra The absolute path of the YAML config file used by a Cassandra node.
http.port 8000 Cassandra The port used by the HTTP server for ping, health check, and build information.
commit.log.relocation.dir Cassandra The local directory where commit logs get relocated to from cdc_raw dir after processing.
commit.log.real.time.processing.enabled false Cassandra Only applicable in Cassandra 4 and, if set to true, the Cassandra connector agent will read commit logs incrementally by watching for updates in commit log index files and stream data in real-time, at the frequency determined by commit.log.marked.complete.poll.interval.ms. If set to false, then the Cassandra 4 connector waits for commit log files to be marked Completed before processing them.
commit.log.marked.complete.poll.interval.ms 10000 Cassandra Only applicable in Cassandra 4 and when real-time streaming is enabled by commit.log.real.time.processing.enabled. This config determines the frequency at which the commit log index file is polled for updates in offset value.
mongodb.hosts MongoDB The connection string to use to connect to the MongoDB replica set.
mongodb.connection.mode replica_set MongoDB Specifies the connecticity mode of the MongoDB database. If you are using a replica set, set the mode to replica_set. If you are using a sharded cluster, set the mode to sharded.
collection.include.list MongoDB An optional, comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored. By default, the connector monitors all collections except those in the local and admin databases. Fully qualified names are of the form databaseName.collectionName.
collection.exclude.list MongoDB An optional, comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring. Fully qualified names are of the form databaseName.collectionName
field_exclude_list MongoDB An optional,comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName.

Advanced source properties

Note: Add a debezium.source. prefix to the listed properties when used in application.properties.

Property Default Source Databases Description
value.converter.schemas.enable true Cassandra, MongoDB, MySQL, Oracle, PostgreSQL, SQLServer, If set to false the schema payload will be excluded from each change event record.
key.converter.schemas.enable true Cassandra, MongoDB, MySQL, Oracle, PostgreSQL, SQLServer If set to false the key payload will be excluded from each change event record.

Debezium connectors for various databases

For additional properties consult Debezium’s documentation for a specific connector:

Configure an initial snapshot without filtering queries (relevant for MySQL, Oracle, PostgreSQL and SQLServer)

Tables to be included in the initial snapshot require the property debezium.source.table.include.list. They should be specified as a comma-separated list of fully-qualified table names.

Use queries in initial snapshot (relevant for MySQL, Oracle, PostgreSQL and SQLServer)

  • In case you want a snapshot to include only a subset of the rows in a table, you need to add the property debezium.source.snapshot.select.statement.overrides and add a comma-separated list of fully-qualified table names. The list should include every table for which you want to add a SELECT statement.

  • For each table in the list above, add a further configuration property that specifies the SELECT statement for the connector to run on the table when it takes a snapshot.

    The specified SELECT statement determines the subset of table rows to include in the snapshot.

    Use the following format to specify the name of this SELECT statement property:

    • Oracle, SQLServer, PostrgreSQL: snapshot.select.statement.overrides.<databaseName>.<tableName>
    • MySQL: snapshot.select.statement.overrides.<schemaName>.<tableName>
  • Add a comma-separated list of fully-qualified column names that are included in the SELECT statement: debezium.source.column.include.list = <databaseName>.<tableName>.<columnName1>,<databaseName>.<tableName>.<columnName2>,<databaseName>.<tableName>.<columnName3>...

  • In case you want to include all the table columns in the SELECT statement, you can use a regular expression in the form of <databaseName>.<tableName>.* instead of adding each one of the table columns to the debezium.source.column.include.list property.

NOTE: Add all tables as a comma-separated list of fully-qualified table names to the property debezium.source.table.include.list.

Example

To select the columns CustomerId, FirstName, and LastName from customer table and join with the invoice table in order to get customers with total invoices greater than 8000, add the following properties to the application.properties file:

debezium.source.table.include.list = chinook.customer

debezium.source.column.include.list = chinook.customer.CustomerID,chinook.customer.FirstName,chinook.customer.LastName

debezium.source.snapshot.select.statement.overrides=chinook.customer

debezium.source.snapshot.select.statement.overrides.chinook.customer = SELECT c.CustomerId, c.FirstName, c.LastName \
FROM chinook.customer c INNER JOIN chinook.invoice inv \
ON c.CustomerId = inv.CustomerId  \
WHERE inv.total > 8000

Form custom message key(s) for change event records

  • By default, Debezium uses the primary key column(s) of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.

  • To establish a custom message key for a table, list the table followed by the column to use as the message key. Each list entry takes the following format:

    debezium.source.message.key.columns=<databaseName>.<tableName>:<columnName>
    
  • To base a table key on multiple column names, insert commas between the columns names:

    debezium.source.message.key.columns=<databaseName>.<tableName>:<columnName1>,<columnName2>...
    
  • The property can include entries for multiple tables. Use a semicolon to separate table entries in the list:

    debezium.source.message.key.columns=<databaseName>.<tableName1>:<columnName1>,<columnName2>;<databaseName>:<tableName>:<columnName1>,<columnName2>
    

Note: In case the property column.include.list is defined in your application.properties file, make sure it includes all the column names that are specified in the property message.key.columns.

Fully-qualified table name

In this document we refer to the fully-qualified table name as <databaseName>.<tableName>. This format is for MySQL databases. For Oracle, SQLServer, and Postgresql databases use <schemaName>.<tableName> instead.

Database Type Fully-qualified Table Name
Oracle, SQLServer, PostrgreSQL <schemaName>.<tableName>
MySQL <databaseName>.<tableName>

Notes

  • You can specify the fully-qualified table name <databaseName>.<tableName> as a regular expression instead of providing the full name of the databaseName and tableName.
  • There is no limit to the number of columns that can be used to create custom message keys. However, it’s best to use the minimum required number of columns to specify a unique key.

Examples

  • The primary key of the tables customer and employee is CustomerID.

    To establish custom messages keys based on FirstName and LastName for the tables customer and employee, add the following property to the application.properties file:

    debezium.source.database.message.key.columns=chinook.customer:FirstName,LastName;chinook.employee:FirstName,LastName
    
  • To specify the columns FirstName and LastName as the message keys for the table chinook.customer in any database add the following property to the application.properties file:

    chinook.customer:FirstName,LastName;(.*).employee:FirstName,LastName
    

Configuring debezium connector to fetch source database secrets from secret store

Providing the source database password in clear text is not an acceptable option. Fortunately, Debezium supports fetching secrets from environment variables or from a file. This is done using a Quarkus feature that provides property expression expansion on configuration values.

An expression string is a mix of plain strings and expression segments, which are wrapped by the sequence ${…​}.

Using environment variables

Property Expressions also work with environment variables, for example:

...
database.password="${MYSQL_PASSWORD}"
...

Getting secrets from a file

In addition, Debezium application.properties supports the ${file:path:key} variable syntax. The path is the absolute path to the file and the key is the property key.

The file storing the secrets should be in a format of a single key=value entry per line, for example:

...
debezium.source.config.providers=file
debezium.source.config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
database.password="${file:/tmp/debezium/secrets.txt:db-password}"
...