Hi, I am trying to create a debezium mysql connector with a transformation to extract the key.
Before key transformations : create source connector mysql with( "connector.class" = 'io.debezium.connector.mysql.MySqlConnector', "database.hostname" = 'mysql', "tasks.max" = '1', "database.port" = '3306', "database.user" = 'debezium', "database.password" = 'dbz', "database.server.id" = '42', "database.server.name" = 'before', "table.whitelist" = 'deepprices.deepprices', "database.history.kafka.bootstrap.servers" = 'kafka:29092', "database.history.kafka.topic" = 'dbz.deepprices', "include.schema.changes" = 'true', "transforms" = 'unwrap', "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope' ); Topic results are : rowtime: 2020/05/20 16:47:23.354 Z, key: [St@5778462697648631933/8247607644536792125], value: {"id": "P195910", "price": "1511.64"} When the key.converter is set to JSON, Key becomes {"id": "P195910"} *I want to extract id from key and make it a string key :* *Expected results* : rowtime: 2020/05/20 16:47:23.354 Z, key: 'P195910', value: {"id": "P195910", "price": "1511.64"} While trying using a transformation "ExtractField" or "ValueToKey" I get "DataException: Field does not exist: id " : drop connector mysql; create source connector mysql with( "connector.class" = 'io.debezium.connector.mysql.MySqlConnector', "database.hostname" = 'mysql', "tasks.max" = '1', "database.port" = '3306', "database.user" = 'debezium', "database.password" = 'dbz', "database.server.id" = '42', "database.server.name" = 'after', "table.whitelist" = 'deepprices.deepprices', "database.history.kafka.bootstrap.servers" = 'kafka:29092', "database.history.kafka.topic" = 'dbz.deepprices', "include.schema.changes" = 'true', "key.converter" = 'org.apache.kafka.connect.json.JsonConverter', "key.converter.schemas.enable" = 'TRUE', "value.converter" = 'org.apache.kafka.connect.json.JsonConverter', "value.converter.schemas.enable" = 'TRUE', "transforms" = 'unwrap,createkey', "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope', "transforms.createkey.type" = 'org.apache.kafka.connect.transforms.ValueToKey', "transforms.createkey.fields" = 'id' ); -- Mohammed Ait Haddou Linkedin.com/in/medait +212697937189