Hello all,

I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by
Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason,
Flink-SQL shows a new row when I update the boolean field, but updates the
row in place when I update the text field, and I'm not understanding why
this happens. My ultimate goal is to use Flink-SQL to do a join on records
that come from both sides of a 1:N relation in the foreign database, to
expose a more ready to consume JSON object to downstream consumers.

The source table is defined like this in MS-SQL:

    CREATE TABLE todo_list (
        id int IDENTITY NOT NULL,
        done bit NOT NULL DEFAULT 0,
        name varchar(MAX) NOT NULL,
        CONSTRAINT PK_todo_list PRIMARY KEY (id)
    );

This is the configuration I'm sending to Debezium, note that I'm not
including the
JSON-schema in both keys and values:

    {
        "name": "todo-connector",
        "config": {
            "connector.class":
"io.debezium.connector.sqlserver.SqlServerConnector",
            "tasks.max": "1",
            "database.server.name": "mssql",
            "database.hostname": "10.88.10.1",
            "database.port": "1433",
            "database.user": "sa",
            "database.password": "...",
            "database.dbname": "todo",
            "database.history.kafka.bootstrap.servers": "10.88.10.10:9092",
            "database.history.kafka.topic": "schema-changes.todo",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": false,
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": false
        }
    }

So Debezium is publishing events to Kafka with keys like this:

    {"id":3}

And values like this (whitespace added for readability), this is updating the
value of the 'name' field:

    {
      "before": {
        "id": 3,
        "done": false,
        "name": "test"
      },
      "after": {
        "id": 3,
        "done": false,
        "name": "test2"
      },
      "source": {
        "version": "1.9.0.Final",
        "connector": "sqlserver",
        "name": "mssql",
        "ts_ms": 1651497653043,
        "snapshot": "false",
        "db": "todo",
        "sequence": null,
        "schema": "dbo",
        "table": "todo_list",
        "change_lsn": "00000025:00000d58:0002",
        "commit_lsn": "00000025:00000d58:0003",
        "event_serial_no": 2
      },
      "op": "u",
      "ts_ms": 1651497654127,
      "transaction": null
    }

(I verified this using a Python script that follows the relevant Kafka topic.)

Next, I'm trying to follow this CDC stream in Flink by adding the
Kafka connector
for Flink SQL, defining a source table and starting a job in the Flink-SQL CLI:

    ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar';

    CREATE TABLE todo_list (
        k_id BIGINT,
        done BOOLEAN,
        name STRING
    )
    WITH (
        'connector'='kafka',
        'topic'='mssql.dbo.todo_list',
        'properties.bootstrap.servers'='10.88.10.10:9092',
        'properties.group.id'='flinksql-todo-list',
        'scan.startup.mode'='earliest-offset',
        'key.format'='json',
        'key.fields-prefix'='k_',
        'key.fields'='k_id',
        'value.format'='debezium-json',
        'value.debezium-json.schema-include'='false',
        'value.fields-include'='EXCEPT_KEY'
    );

    SELECT * FROM todo_list;

Now, when I perform a query like this in the MS-SQL database:

    UPDATE todo_list SET name='test2' WHERE id=3;

Now I see that the Flink-SQL client updates the row with id=3 to have the new
value "test2" for the 'name' field, as I was expecting. However, when I
duplicate the 'done' field to have a different value, Flink-SQL seems to leave
the old row with values (3, False, 'test2') intact, and shows a new row with
values (3, True, 'test2').

I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the first
parentheses in the CREATE TABLE statement, but this seems to make no
difference, except when running `DESCRIBE todo_list` in Flink-SQL.

I have no idea why the boolean field would cause different behavior than the
text field. Am I missing some piece of configuration, are my expectations
wrong?


Regards,
Joost Molenaar

Reply via email to