[jira] [Updated] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys
[ https://issues.apache.org/jira/browse/KAFKA-14665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fuxin Hao updated KAFKA-14665: -- Description: I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings. Say I have the following table: {code:java} CREATE TABLE pk_created_at ( created_at timestamp without time zone DEFAULT current_timestamp not null, PRIMARY KEY (created_at) ); insert into pk_created_at values(current_timestamp); {code} My source connector configuration: {code:java} { "name": "test-connector", "config": { "snapshot.mode": "always", "plugin.name": "pgoutput", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "source", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test", "database.server.name": "test", "slot.name" : "test", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enabled": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enabled": true, "decimal.handling.mode": "string", "time.precision.mode": "adaptive", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } {code} And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be: {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code} After applying my SMT, the messages would be like this: {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code} {{ }} It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below: {code:java} 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 Call getNextException to see other errors in the
[jira] [Created] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys
Fuxin Hao created KAFKA-14665: - Summary: my custom SMT that converts Int to String does not work for primary keys Key: KAFKA-14665 URL: https://issues.apache.org/jira/browse/KAFKA-14665 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.1.0 Reporter: Fuxin Hao I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings. Say I have the following table: {code:java} CREATE TABLE pk_created_at ( created_at timestamp without time zone DEFAULT current_timestamp not null, PRIMARY KEY (created_at) ); insert into pk_created_at values(current_timestamp); {code} {{}} {{}} My source connector configuration: {code:java} { "name": "test-connector", "config": { "snapshot.mode": "always", "plugin.name": "pgoutput", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "source", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test", "database.server.name": "test", "slot.name" : "test", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enabled": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enabled": true, "decimal.handling.mode": "string", "time.precision.mode": "adaptive", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } {code} {{}} {{}} And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be: {{}} {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code} {{}} After applying my SMT, the messages would be like this: {{}} {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code} {{ }} It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below: {{}} {code:java} 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT