Configure Debezium Server
This reference document is following the original Debezium documentation. It explains how to use Debezium Server’s configuration file named application.properties
to configure Debezium Server.
You will find similar explanations inside the application.properties
file generated by the scaffold
command:
redis-di scaffold --db-type cassandra|mysql|oracle|postgresql|sqlserver --dir <PATH_TO_DIR>
Target (sink) connector
The target connector is Redis.
Basic configuration
debezium.sink.type=redis
debezium.sink.redis.address=<REDIS_DI_BDB_HOST>:12001
debezium.sink.redis.password=<REDIS_DI_PASSWORD>
Preventing data loss
In order to prevent data loss, the Debezium Redis Sink Connector needs to be configured to wait for write acknowledgment from the RDI database replica shard.
Use the following property to achieve that:
debezium.sink.redis.wait.retry.enabled=true
In addition you can configure the timeout on waiting for the replica shard acknowledgment and the delay between write retries (both 1000 milliseconds by default). See Redis Data Integration configuration reference for more information.
Log rotation
Add the following properties to the application.properties
file and restart Debezium Server:
quarkus.log.file.enable=true
quarkus.log.file.path=<LOG_FILE_PATH>
# The maximum file size of the log file after which a rotation is executed.
quarkus.log.file.rotation.max-file-size=<MAX_FILE_SIZE>
# Indicates whether to rotate log files on server initialization.
quarkus.log.file.rotation.rotate-on-boot=true
# File handler rotation file suffix. When used, the file will be rotated based on its suffix.
quarkus.log.file.rotation.file-suffix=<LOG_FILE_SUFFIX>
# The maximum number of backups to keep.
quarkus.log.file.rotation.max-backup-index=<MAX_BACKUP_INDEX>
Example
quarkus.log.file.path=/home/debezium-server/logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
With this configuration the log file, debezium.log
, will be created in the directoy /home/debezium-server/logs/
.
When the log file size reaches 5M it will be renamed to debezium.log.2023-01-22.1.gz
, and a new log file, debezium.log
, will be created.
if the size of the log file reaches 5M and there are already 3 backup log files, the first backup file, debezium.log.2023-01-22.1.gz
, will be deleted.
At any point in time there will only be 3 backup log files: debezium.log.2023-01-22.1.gz
, debezium.log.2023-01-22.2.gz
, debezium.log.2023-01-22.3.gz
, and an active log file: debezium.log
.
Redis Data Integration configuration reference
Property | Default | Description |
---|---|---|
debezium.sink.type | Must be set to redis. | |
debezium.sink.redis.address | An address, formatted as host:port, at which the Redis target streams are provided. | |
database.user | Username to use when connecting to the database server. | |
debezium.sink.redis.password | (Optional) A password (of respective user) used to communicate with Redis. A password must be set if a user is set. | |
debezium.sink.redis.ssl.enabled | false | (Optional) Use SSL to communicate with Redis. |
debezium.sink.redis.null.key | Redis does not support the notion of data without key, so this string will be used as key for records without primary key. | |
debezium.sink.redis.null.value | Redis does not support the notion of null payloads, as is the case with tombstone events. This string will be used as value for records without a payload. | |
debezium.sink.redis.batch.size | 500 | Number of change records to insert in a single batch write (Pipelined transaction). |
debezium.sink.redis.retry.initial.delay.ms | 300 | Initial retry delay when encountering Redis connection or OOM issues. This value will be doubled upon every retry but won’t exceed debezium.sink.redis.retry.max.delay.ms . |
debezium.sink.redis.retry.max.delay.ms | 10000 | Max delay when encountering Redis connection or OOM issues. |
debezium.sink.redis.memory.limit.mb | 80 | Debezium stops sending events when Redis size exceeds this threshold. |
debezium.sink.redis.wait.enabled | false | In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica. |
debezium.sink.redis.wait.timeout.ms | 1000 | Defines the timeout in milliseconds when waiting for replica. |
debezium.sink.redis.wait.retry.enabled | false | Enables retry on wait for replica failure. |
debezium.sink.redis.wait.retry.delay.ms | 1000 | Defines the delay of retry on wait for replica failure. |
Notes
- When using Redis to store schema history and offsets, the values of the properties
debezium.source.offset.storage.redis.*
anddebezium.source.schema.history.internal.redis.*
will be inherited from the correspondingdebezium.sink.redis.*
properties. - In case you would like to override any of these defaults inherited from the sink, add them explicitly as
debezium.source.offset.storage.redis.*
and/ordebezium.source.schema.history.internal.redis.*
properties.
Source (database) connector
The source connector depends on the database you get data from. The basic configurations are the same for all database type except for the connector class.
Essential source properties
Note: Add
debezium.source.
prefix to the listed properties when using inapplication.properties
.
Property | Default | Source Databases | Description |
---|---|---|---|
connector.class | choose from the following: io.debezium.connector.postgresql.PostgresConnector io.debezium.connector.mysql.MySqlConnector io.debezium.connector.oracle.OracleConnector io.debezium.connector.sqlserver.SqlServerConnector io.debezium.connector.cassandra.Cassandra4Connector |
||
database.hostname | MySQL, Oracle, PostgreSQL, SQLServer | The address of the database instance. | |
database.port | MySQL, Oracle, PostgreSQL, SQLServer | The port number of the database instance. | |
database.user | MySQL, Oracle, PostgreSQL, SQLServer | Username to use when connecting to the database server. | |
database.password | MySQL, Oracle, PostgreSQL, SQLServer | Password to use when connecting to the database server. | |
database.dbname | Oracle, PostgreSQL | The name of the database from which to stream the changes. | |
database.names | SQLServer | The comma-separated list of the SQL Server database names from which to stream the changes. | |
database.pdb.name | ORCLPDB1 | Oracle | The name of the Oracle Pluggable Database that the connector captures changes from. For non-CDB installation, do not specify this property. |
database.encrypt | false | SQLServer | If SSL is enabled for a SQL Server database, enable SSL by setting the value of this property to true. |
database.server.id | 1 | MySQL | A numeric ID of this database client, which must be unique across all currently-running database processes in the MySQL cluster. |
schema.include.list | Oracle, PostgreSQL, SQLServer | An optional, comma-separated list of regular expressions that match names of schemas for which you want to capture changes. Any schema name not included in schema.include.list is excluded from having its changes captured. By default, all non-system schemas have their changes captured. If you include this property in the configuration, Do not also set the schema.exclude.list property. |
|
schema.exclude.list | Oracle, PostgreSQL, SQLServer | An optional, comma-separated list of regular expressions that match names of schemas for which you do not want to capture changes. Any schema whose name is not included in schema.exclude.list has its changes captured, with the exception of system schemas. If you include this property in the configuration, do not also set the schema.include.list property. |
|
database.include.list | MySQL | An optional, comma-separated list of regular expressions that match the names of the databases for which to capture changes. The connector does not capture changes in any database whose name is not in database.include.list . By default, the connector captures changes in all databases. If you include this property in the configuration, do not also set the database.exclude.list property. |
|
database.exclude.list | MySQL | An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes. The connector captures changes in any database whose name is not in the database.exclude.list . if you include this property in the configuration, do not also set the database.include.list property. |
|
table.include.list | MySQL, Oracle, PostgreSQL, SQLServer | An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables that you want Debezium to capture; any table that is not included in table.include.list is excluded from capture. Each identifier is of the form schemaName.tableName. By default, the connector captures all non-system tables for the designated schemas. Must not be used with table.exclude.list . |
|
table.exclude.list | MySQL, Oracle, PostgreSQL, SQLServer | An optional comma-separated list of regular expressions that match fully-qualified table identifiers for the tables that you want to exclude from being captured; Debezium captures all tables that are not included in table.exclude.list . Each identifier is of the form schemaName.tableName. Must not be used with table.include.list . |
|
column.include.list | empty string | MySQL, Oracle, PostgreSQL, SQLServer | An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in the change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName . Note that primary key columns are always included in the event’s key, even if not included in the value. Do not also set the column.exclude.list property. |
column.exclude.list | empty string | MySQL, Oracle, PostgreSQL, SQLServer | An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName . Note that primary key columns are always included in the event’s key, also if excluded from the value. Do not also set the column.include.list property. |
topic.prefix | Cassandra, MySQL, Oracle, PostgreSQL, SQLServer | A prefix for all topic names that receive events emitted by this connector. | |
cassandra.node.id | Cassandra | The name of the Cassandra node. | |
cassandra.hosts | localhost | Cassandra | One or more addresses of Cassandra nodes separated by “,”. |
cassandra.port | 9042 | Cassandra | The port used to connect to Cassandra host(s). |
cassandra.config | Cassandra | The absolute path of the YAML config file used by a Cassandra node. | |
http.port | 8000 | Cassandra | The port used by the HTTP server for ping, health check, and build info. |
commit.log.relocation.dir | Cassandra | The local directory where commit logs get relocated to from cdc_raw dir once processed. | |
commit.log.real.time.processing.enabled | false | Cassandra | Only applicable in Cassandra 4 and if set to true, Cassandra connector agent will read commit logs incrementally by watching for updates in commit log index files and stream data in real-time, at frequency determined by commit.log.marked.complete.poll.interval.ms . If set to false, then Cassandra 4 connector waits for commit logs file to be marked Completed before processing them. |
commit.log.marked.complete.poll.interval.ms | 10000 | Cassandra | Only applicable in Cassandra 4 and when real-time streaming is enabled by commit.log.real.time.processing.enabled . This config determines the frequency at which commit log index file is polled for updates in offset value. |
Advanced source properties
Note: Add
debezium.source.
prefix to the listed properties when using inapplication.properties
.
Property | Default | Source Databases | Description |
---|---|---|---|
value.converter.schemas.enable | true | Cassandra, MySQL, Oracle, PostgreSQL, SQLServer | If set to false the schema payload will be excluded from each change event record. |
Debezium Connectors for Various Databases
For additional properties consult Debezium documentation for a specific connector:
Configuring initial snapshot without filtering queries
Tables to be included in the initial snapshot require the property debezium.source.table.include.list
. They should be specified as a comma-separated list of fully-qualified table names.
Using queries in initial snapshot
-
In case you want a snapshot to include only a subset of the rows in a table, you need to add the property
debezium.source.snapshot.select.statement.overrides
and add a comma-separated list of fully-qualified table names. The list should include every table for which you want to add a SELECT statement. -
For each table in the list above, add a further configuration property that specifies the
SELECT
statement for the connector to run on the table when it takes a snapshot.The specified
SELECT
statement determines the subset of table rows to include in the snapshot.Use the following format to specify the name of this
SELECT
statement property:- Oracle, SQLServer, PostrgreSQL:
snapshot.select.statement.overrides.<DATABASE_NAME>.<TABLE_NAME>
- MySQL:
snapshot.select.statement.overrides.<SCHEMA_NAME>.<TABLE_NAME>
- Oracle, SQLServer, PostrgreSQL:
-
Add a comma-separated list of fully-qualified column names that are included in the
SELECT
statement:debezium.source.column.include.list = <DATABASE_NAME>.<TABLE_NAME>.<COLUMN_NAME1>,<DATABASE_NAME>.<TABLE_NAME>.<COLUMN_NAME2>,<DATABASE_NAME>.<TABLE_NAME>.<COLUMN_NAME3>...
-
In case you want to include all the table columns in the
SELECT
statement, you can use regular expression in the form of<DATABASE_NAME>.<TABLE_NAME>.*
instead of adding each one of the table columns todebezium.source.column.include.list
property.
NOTE: Add all tables as a comma-separated list of fully-qualified table names to the property
debezium.source.table.include.list
.
Example
To select the columns CustomerId
, FirstName
and LastName
from customer
table and join it with invoice
table in order to get customers with total invoices greater than 8000, we need to add the following properties to the application.properties
file:
debezium.source.table.include.list = chinook.customer
debezium.source.column.include.list = chinook.customer.CustomerID,chinook.customer.FirstName,chinook.customer.LastName
debezium.source.snapshot.select.statement.overrides=chinook.customer
debezium.source.snapshot.select.statement.overrides.chinook.customer = SELECT c.CustomerId, c.FirstName, c.LastName \
FROM chinook.customer c INNER JOIN chinook.invoice inv \
ON c.CustomerId = inv.CustomerId \
WHERE inv.total > 8000
Form custom message keys for change event records
-
By default, Debezium uses the primary key column(s) of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.
-
To establish a custom message key for a table, list the table followed by the column to use as the message key. Each list entry takes the following format:
debezium.source.message.key.columns=<databaseName>.<tableName>:<columnName>
-
To base a table key on multiple column names, insert commas between the columns names:
debezium.source.message.key.columns=<databaseName>.<tableName>:<columnName1>,<columnName2>...
-
The property can include entries for multiple tables. Use a semicolon to separate table entries in the list:
debezium.source.message.key.columns=<databaseName>.<tableName1>:<columnName1>,<columnName2>;<databaseName>:<tableName>:<columnName1>,<columnName2>
Note: In case the property
column.include.list
is defined in yourapplication.properties
file, make sure it includes all the column names that are specified in the propertymessage.key.columns
.
Fully-qualified table name
In this document we refer to the fully-qualified table name as <databaseName>.<tableName>
. This format is for MySQL database. For Oracle, SQLServer, and PostgreSQL databases, use <schemaName>
.<tableName>
instead.
Database Type | Fully-qualified Table Name |
---|---|
Oracle, SQLServer, PostrgreSQL | <schemaName>.<tableName> |
MySQL | <databaseName>.<tableName> |
Notes
- You can specify the fully-qualified table name
<databaseName>.<tableName>
as a regular expression instead of providing the full name of thedatabaseName
andtableName
. - There is no limit to the number of columns that can be used to create custom message keys. However, it’s best to use the minimum required number of columns to specify a unique key.
Examples
-
The primary key of the tables
customer
andemployee
isCustomerID
.To establish custom messages keys based on
FirstName
andLastName
for the tablescustomer
andemployee
, add the following property to theapplication.properties
file:debezium.source.database.message.key.columns=chinook.customer:FirstName,LastName;chinook.employee:FirstName,LastName
-
To specify the columns
FirstName
andLastName
as the message keys for the tablechinook.customer
in any database add the following property to theapplication.properties
file:chinook.customer:FirstName,LastName;(.*).employee:FirstName,LastName
Configuring Debezium Connector to fetch source database secrets from secret store
Providing Source Database password in clear text is not an acceptable option. Fortunately, Debezium supports fetching secrets from environment variables or file system. This is done using a Quarkus feature that provides property expressions expansion on configuration values.
An expression string is a mix of plain strings and expression segments, which are wrapped by the sequence ${…}
.
Using Environment Variables
Property Expressions also work with environment variables, for example:
...
database.password="${MYSQL_PASSWORD}"
...
Getting secrets from file system
In addition, Debezium application.properties
supports the ${file:path:key}
variable syntax - the path
is the path to the file and the key
is the property key.
The file storing the secrets should be in a format of a single key=value
per line, for example:
...
debezium.source.config.providers=file
debezium.source.config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
database.password="${file:/tmp/debezium/secrets.txt:db-password}"
...