Hello all, I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason, Flink-SQL shows a new row when I update the boolean field, but updates the row in place when I update the text field, and I'm not understanding why this happens. My ultimate goal is to use Flink-SQL to do a join on records that come from both sides of a 1:N relation in the foreign database, to expose a more ready to consume JSON object to downstream consumers.
The source table is defined like this in MS-SQL: CREATE TABLE todo_list ( id int IDENTITY NOT NULL, done bit NOT NULL DEFAULT 0, name varchar(MAX) NOT NULL, CONSTRAINT PK_todo_list PRIMARY KEY (id) ); This is the configuration I'm sending to Debezium, note that I'm not including the JSON-schema in both keys and values: { "name": "todo-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max": "1", "database.server.name": "mssql", "database.hostname": "10.88.10.1", "database.port": "1433", "database.user": "sa", "database.password": "...", "database.dbname": "todo", "database.history.kafka.bootstrap.servers": "10.88.10.10:9092", "database.history.kafka.topic": "schema-changes.todo", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false } } So Debezium is publishing events to Kafka with keys like this: {"id":3} And values like this (whitespace added for readability), this is updating the value of the 'name' field: { "before": { "id": 3, "done": false, "name": "test" }, "after": { "id": 3, "done": false, "name": "test2" }, "source": { "version": "1.9.0.Final", "connector": "sqlserver", "name": "mssql", "ts_ms": 1651497653043, "snapshot": "false", "db": "todo", "sequence": null, "schema": "dbo", "table": "todo_list", "change_lsn": "00000025:00000d58:0002", "commit_lsn": "00000025:00000d58:0003", "event_serial_no": 2 }, "op": "u", "ts_ms": 1651497654127, "transaction": null } (I verified this using a Python script that follows the relevant Kafka topic.) Next, I'm trying to follow this CDC stream in Flink by adding the Kafka connector for Flink SQL, defining a source table and starting a job in the Flink-SQL CLI: ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar'; CREATE TABLE todo_list ( k_id BIGINT, done BOOLEAN, name STRING ) WITH ( 'connector'='kafka', 'topic'='mssql.dbo.todo_list', 'properties.bootstrap.servers'='10.88.10.10:9092', 'properties.group.id'='flinksql-todo-list', 'scan.startup.mode'='earliest-offset', 'key.format'='json', 'key.fields-prefix'='k_', 'key.fields'='k_id', 'value.format'='debezium-json', 'value.debezium-json.schema-include'='false', 'value.fields-include'='EXCEPT_KEY' ); SELECT * FROM todo_list; Now, when I perform a query like this in the MS-SQL database: UPDATE todo_list SET name='test2' WHERE id=3; Now I see that the Flink-SQL client updates the row with id=3 to have the new value "test2" for the 'name' field, as I was expecting. However, when I duplicate the 'done' field to have a different value, Flink-SQL seems to leave the old row with values (3, False, 'test2') intact, and shows a new row with values (3, True, 'test2'). I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the first parentheses in the CREATE TABLE statement, but this seems to make no difference, except when running `DESCRIBE todo_list` in Flink-SQL. I have no idea why the boolean field would cause different behavior than the text field. Am I missing some piece of configuration, are my expectations wrong? Regards, Joost Molenaar