This reference document is following the original Debezium documentation. It explains how to use Debezium Server’s configuration file named application.properties to configure Debezium Server. You will find similar explanations inside the application.properties file generated by the scaffold command:

redis-di scaffold --db-type cassandra|mysql|oracle|postgresql|sqlserver --dir <PATH_TO_DIR>

Target (sink) connector

The target connector is Redis.

Basic configuration

debezium.sink.type=redis
debezium.sink.redis.address=<REDIS_DI_BDB_HOST>:12001
debezium.sink.redis.password=<REDIS_DI_PASSWORD>

Preventing data loss

In order to prevent data loss, the Debezium Redis Sink Connector needs to be configured to wait for write acknowledgment from the RDI database replica shard.

Use the following property to achieve that:

debezium.sink.redis.wait.retry.enabled=true

In addition you can configure the timeout on waiting for the replica shard acknowledgment and the delay between write retries (both 1000 milliseconds by default). See Redis Data Integration configuration reference for more information.

Log rotation

Add the following properties to the application.properties file and restart Debezium Server:

  quarkus.log.file.enable=true
  quarkus.log.file.path=<LOG_FILE_PATH>
  # The maximum file size of the log file after which a rotation is executed.
  quarkus.log.file.rotation.max-file-size=<MAX_FILE_SIZE>
  # Indicates whether to rotate log files on server initialization.
  quarkus.log.file.rotation.rotate-on-boot=true
  # File handler rotation file suffix. When used, the file will be rotated based on its suffix.
  quarkus.log.file.rotation.file-suffix=<LOG_FILE_SUFFIX>
  # The maximum number of backups to keep.
  quarkus.log.file.rotation.max-backup-index=<MAX_BACKUP_INDEX>

Example

quarkus.log.file.path=/home/debezium-server/logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3

With this configuration the log file, debezium.log, will be created in the directoy /home/debezium-server/logs/. When the log file size reaches 5M it will be renamed to debezium.log.2023-01-22.1.gz, and a new log file, debezium.log, will be created. if the size of the log file reaches 5M and there are already 3 backup log files, the first backup file, debezium.log.2023-01-22.1.gz, will be deleted. At any point in time there will only be 3 backup log files: debezium.log.2023-01-22.1.gz, debezium.log.2023-01-22.2.gz, debezium.log.2023-01-22.3.gz, and an active log file: debezium.log.

Redis Data Integration configuration reference

Property Default Description
debezium.sink.type Must be set to redis.
debezium.sink.redis.address An address, formatted as host:port, at which the Redis target streams are provided.
database.user Username to use when connecting to the database server.
debezium.sink.redis.password (Optional) A password (of respective user) used to communicate with Redis. A password must be set if a user is set.
debezium.sink.redis.ssl.enabled false (Optional) Use SSL to communicate with Redis.
debezium.sink.redis.null.key Redis does not support the notion of data without key, so this string will be used as key for records without primary key.
debezium.sink.redis.null.value Redis does not support the notion of null payloads, as is the case with tombstone events. This string will be used as value for records without a payload.
debezium.sink.redis.batch.size 500 Number of change records to insert in a single batch write (Pipelined transaction).
debezium.sink.redis.retry.initial.delay.ms 300 Initial retry delay when encountering Redis connection or OOM issues. This value will be doubled upon every retry but won’t exceed debezium.sink.redis.retry.max.delay.ms.
debezium.sink.redis.retry.max.delay.ms 10000 Max delay when encountering Redis connection or OOM issues.
debezium.sink.redis.memory.limit.mb 80 Debezium stops sending events when Redis size exceeds this threshold.
debezium.sink.redis.wait.enabled false In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.
debezium.sink.redis.wait.timeout.ms 1000 Defines the timeout in milliseconds when waiting for replica.
debezium.sink.redis.wait.retry.enabled false Enables retry on wait for replica failure.
debezium.sink.redis.wait.retry.delay.ms 1000 Defines the delay of retry on wait for replica failure.

Notes

  • When using Redis to store schema history and offsets, the values of the properties debezium.source.offset.storage.redis.* and debezium.source.schema.history.internal.redis.* will be inherited from the corresponding debezium.sink.redis.* properties.
  • In case you would like to override any of these defaults inherited from the sink, add them explicitly as debezium.source.offset.storage.redis.* and/or debezium.source.schema.history.internal.redis.* properties.

Source (database) connector

The source connector depends on the database you get data from. The basic configurations are the same for all database type except for the connector class.

Essential source properties

Note: Add debezium.source. prefix to the listed properties when using in application.properties.

Property Default Source Databases Description
connector.class choose from the following: io.debezium.connector.postgresql.PostgresConnector io.debezium.connector.mysql.MySqlConnector io.debezium.connector.oracle.OracleConnector io.debezium.connector.sqlserver.SqlServerConnector io.debezium.connector.cassandra.Cassandra4Connector
database.hostname MySQL, Oracle, PostgreSQL, SQLServer The address of the database instance.
database.port MySQL, Oracle, PostgreSQL, SQLServer The port number of the database instance.
database.user MySQL, Oracle, PostgreSQL, SQLServer Username to use when connecting to the database server.
database.password MySQL, Oracle, PostgreSQL, SQLServer Password to use when connecting to the database server.
database.dbname Oracle, PostgreSQL The name of the database from which to stream the changes.
database.names SQLServer The comma-separated list of the SQL Server database names from which to stream the changes.
database.pdb.name ORCLPDB1 Oracle The name of the Oracle Pluggable Database that the connector captures changes from. For non-CDB installation, do not specify this property.
database.encrypt false SQLServer If SSL is enabled for a SQL Server database, enable SSL by setting the value of this property to true.
database.server.id 1 MySQL A numeric ID of this database client, which must be unique across all currently-running database processes in the MySQL cluster.
schema.include.list Oracle, PostgreSQL, SQLServer An optional, comma-separated list of regular expressions that match names of schemas for which you want to capture changes. Any schema name not included in schema.include.list is excluded from having its changes captured. By default, all non-system schemas have their changes captured. If you include this property in the configuration, Do not also set the schema.exclude.list property.
schema.exclude.list Oracle, PostgreSQL, SQLServer An optional, comma-separated list of regular expressions that match names of schemas for which you do not want to capture changes. Any schema whose name is not included in schema.exclude.list has its changes captured, with the exception of system schemas. If you include this property in the configuration, do not also set the schema.include.list property.
database.include.list MySQL An optional, comma-separated list of regular expressions that match the names of the databases for which to capture changes. The connector does not capture changes in any database whose name is not in database.include.list. By default, the connector captures changes in all databases. If you include this property in the configuration, do not also set the database.exclude.list property.
database.exclude.list MySQL An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes. The connector captures changes in any database whose name is not in the database.exclude.list. if you include this property in the configuration, do not also set the database.include.list property.
table.include.list MySQL, Oracle, PostgreSQL, SQLServer An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables that you want Debezium to capture; any table that is not included in table.include.list is excluded from capture. Each identifier is of the form schemaName.tableName. By default, the connector captures all non-system tables for the designated schemas. Must not be used with table.exclude.list.
table.exclude.list MySQL, Oracle, PostgreSQL, SQLServer An optional comma-separated list of regular expressions that match fully-qualified table identifiers for the tables that you want to exclude from being captured; Debezium captures all tables that are not included in table.exclude.list. Each identifier is of the form schemaName.tableName. Must not be used with table.include.list.
column.include.list empty string MySQL, Oracle, PostgreSQL, SQLServer An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in the change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Note that primary key columns are always included in the event’s key, even if not included in the value. Do not also set the column.exclude.list property.
column.exclude.list empty string MySQL, Oracle, PostgreSQL, SQLServer An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Note that primary key columns are always included in the event’s key, also if excluded from the value. Do not also set the column.include.list property.
topic.prefix Cassandra, MySQL, Oracle, PostgreSQL, SQLServer A prefix for all topic names that receive events emitted by this connector.
cassandra.node.id Cassandra The name of the Cassandra node.
cassandra.hosts localhost Cassandra One or more addresses of Cassandra nodes separated by “,”.
cassandra.port 9042 Cassandra The port used to connect to Cassandra host(s).
cassandra.config Cassandra The absolute path of the YAML config file used by a Cassandra node.
http.port 8000 Cassandra The port used by the HTTP server for ping, health check, and build info.
commit.log.relocation.dir Cassandra The local directory where commit logs get relocated to from cdc_raw dir once processed.
commit.log.real.time.processing.enabled false Cassandra Only applicable in Cassandra 4 and if set to true, Cassandra connector agent will read commit logs incrementally by watching for updates in commit log index files and stream data in real-time, at frequency determined by commit.log.marked.complete.poll.interval.ms. If set to false, then Cassandra 4 connector waits for commit logs file to be marked Completed before processing them.
commit.log.marked.complete.poll.interval.ms 10000 Cassandra Only applicable in Cassandra 4 and when real-time streaming is enabled by commit.log.real.time.processing.enabled. This config determines the frequency at which commit log index file is polled for updates in offset value.

Advanced source properties

Note: Add debezium.source. prefix to the listed properties when using in application.properties.

Property Default Source Databases Description
value.converter.schemas.enable true Cassandra, MySQL, Oracle, PostgreSQL, SQLServer If set to false the schema payload will be excluded from each change event record.

Debezium Connectors for Various Databases

For additional properties consult Debezium documentation for a specific connector:

Configuring initial snapshot without filtering queries

Tables to be included in the initial snapshot require the property debezium.source.table.include.list. They should be specified as a comma-separated list of fully-qualified table names.

Using queries in initial snapshot

  • In case you want a snapshot to include only a subset of the rows in a table, you need to add the property debezium.source.snapshot.select.statement.overrides and add a comma-separated list of fully-qualified table names. The list should include every table for which you want to add a SELECT statement.

  • For each table in the list above, add a further configuration property that specifies the SELECT statement for the connector to run on the table when it takes a snapshot.

    The specified SELECT statement determines the subset of table rows to include in the snapshot.

    Use the following format to specify the name of this SELECT statement property:

    • Oracle, SQLServer, PostrgreSQL: snapshot.select.statement.overrides.<DATABASE_NAME>.<TABLE_NAME>
    • MySQL: snapshot.select.statement.overrides.<SCHEMA_NAME>.<TABLE_NAME>
  • Add a comma-separated list of fully-qualified column names that are included in the SELECT statement: debezium.source.column.include.list = <DATABASE_NAME>.<TABLE_NAME>.<COLUMN_NAME1>,<DATABASE_NAME>.<TABLE_NAME>.<COLUMN_NAME2>,<DATABASE_NAME>.<TABLE_NAME>.<COLUMN_NAME3>...

  • In case you want to include all the table columns in the SELECT statement, you can use regular expression in the form of <DATABASE_NAME>.<TABLE_NAME>.* instead of adding each one of the table columns to debezium.source.column.include.list property.

NOTE: Add all tables as a comma-separated list of fully-qualified table names to the property debezium.source.table.include.list.

Example

To select the columns CustomerId, FirstName and LastName from customer table and join it with invoice table in order to get customers with total invoices greater than 8000, we need to add the following properties to the application.properties file:

debezium.source.table.include.list = chinook.customer

debezium.source.column.include.list = chinook.customer.CustomerID,chinook.customer.FirstName,chinook.customer.LastName

debezium.source.snapshot.select.statement.overrides=chinook.customer

debezium.source.snapshot.select.statement.overrides.chinook.customer = SELECT c.CustomerId, c.FirstName, c.LastName \
FROM chinook.customer c INNER JOIN chinook.invoice inv \
ON c.CustomerId = inv.CustomerId  \
WHERE inv.total > 8000

Form custom message keys for change event records

  • By default, Debezium uses the primary key column(s) of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.

  • To establish a custom message key for a table, list the table followed by the column to use as the message key. Each list entry takes the following format:

    debezium.source.message.key.columns=<databaseName>.<tableName>:<columnName>
    
  • To base a table key on multiple column names, insert commas between the columns names:

    debezium.source.message.key.columns=<databaseName>.<tableName>:<columnName1>,<columnName2>...
    
  • The property can include entries for multiple tables. Use a semicolon to separate table entries in the list:

    debezium.source.message.key.columns=<databaseName>.<tableName1>:<columnName1>,<columnName2>;<databaseName>:<tableName>:<columnName1>,<columnName2>
    

Note: In case the property column.include.list is defined in your application.properties file, make sure it includes all the column names that are specified in the property message.key.columns.

Fully-qualified table name

In this document we refer to the fully-qualified table name as <databaseName>.<tableName>. This format is for MySQL database. For Oracle, SQLServer, and PostgreSQL databases, use <schemaName>.<tableName> instead.

Database Type Fully-qualified Table Name
Oracle, SQLServer, PostrgreSQL <schemaName>.<tableName>
MySQL <databaseName>.<tableName>

Notes

  • You can specify the fully-qualified table name <databaseName>.<tableName> as a regular expression instead of providing the full name of the databaseName and tableName.
  • There is no limit to the number of columns that can be used to create custom message keys. However, it’s best to use the minimum required number of columns to specify a unique key.

Examples

  • The primary key of the tables customer and employee is CustomerID.

    To establish custom messages keys based on FirstName and LastName for the tables customer and employee, add the following property to the application.properties file:

    debezium.source.database.message.key.columns=chinook.customer:FirstName,LastName;chinook.employee:FirstName,LastName
    
  • To specify the columns FirstName and LastName as the message keys for the table chinook.customer in any database add the following property to the application.properties file:

    chinook.customer:FirstName,LastName;(.*).employee:FirstName,LastName
    

Configuring Debezium Connector to fetch source database secrets from secret store

Providing Source Database password in clear text is not an acceptable option. Fortunately, Debezium supports fetching secrets from environment variables or file system. This is done using a Quarkus feature that provides property expressions expansion on configuration values.

An expression string is a mix of plain strings and expression segments, which are wrapped by the sequence ${…​}.

Using Environment Variables

Property Expressions also work with environment variables, for example:

...
database.password="${MYSQL_PASSWORD}"
...

Getting secrets from file system

In addition, Debezium application.properties supports the ${file:path:key} variable syntax - the path is the path to the file and the key is the property key.

The file storing the secrets should be in a format of a single key=value per line, for example:

...
debezium.source.config.providers=file
debezium.source.config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
database.password="${file:/tmp/debezium/secrets.txt:db-password}"
...