[ https://issues.apache.org/jira/browse/KAFKA-14665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fuxin Hao updated KAFKA-14665: ------------------------------ Description: I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings. Say I have the following table: {code:java} CREATE TABLE pk_created_at ( created_at timestamp without time zone DEFAULT current_timestamp not null, PRIMARY KEY (created_at) ); insert into pk_created_at values(current_timestamp); {code} My source connector configuration: {code:java} { "name": "test-connector", "config": { "snapshot.mode": "always", "plugin.name": "pgoutput", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "source", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test", "database.server.name": "test", "slot.name" : "test", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enabled": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enabled": true, "decimal.handling.mode": "string", "time.precision.mode": "adaptive", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } {code} And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be: {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code} After applying my SMT, the messages would be like this: {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code} {{ }} It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below: {code:java} 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 Call getNextException to see other errors in the batch. at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.postgresql.util.PSQLException: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) ... 17 more{code} The error seems like the sink connector was still trying to insert `{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I verified that the messages in the kafka topic have been transformed into strings. It worked if `{{{}created_at`{}}} is not a primary key. [My SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]: {code:java} public class DebeziumTimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> { private static final Logger LOG = LoggerFactory.getLogger(DebeziumTimestampConverter.class); private Cache<Schema, Schema> schemaUpdateCache; private static final String PURPOSE = "convert io.debezium.time.MicroTimestamp into String"; @Override public void configure(Map<String, ?> props) { schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16)); } @Override public ConfigDef config() { return new ConfigDef(); } @Override public void close() { } protected Schema operatingSchema(R record) { return record.valueSchema(); } protected Object operatingValue(R record) { return record.value(); } private String formatDate(Integer epoch) { if (epoch == null) { return ""; } LocalDate d = LocalDate.ofEpochDay(epoch); return d.toString(); } private String formatTime(Integer epoch) { if (epoch == null) { return ""; } java.util.Date date = new java.util.Date(epoch); return new SimpleDateFormat("HH:mm:ss.SSS").format(date); } private String formatMicroTime(Long epochMicroSeconds) { if (epochMicroSeconds == null) { return ""; } DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC)); long epochSeconds = epochMicroSeconds / 1000000L; long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ; Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset ); return formatter.format(instant); } private String formatTimestamp(Long epochMilliSeconds) { if (epochMilliSeconds == null) { return ""; } DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC)); Instant instant = Instant.ofEpochMilli( epochMilliSeconds ); return formatter.format(instant); } private String formatMicroTimestamp(Long epochMicroSeconds) { if (epochMicroSeconds == null) { return ""; } long epochSeconds = epochMicroSeconds / 1000000L; long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ; Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset ); return instant.toString(); } private Schema makeUpdatedSchema(Schema schema) { final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); for (Field field: schema.fields()) { if (field.schema().type() != Schema.Type.STRING && ( MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) || Date.SCHEMA_NAME.equals(field.schema().name()) || Time.SCHEMA_NAME.equals(field.schema().name()) || MicroTime.SCHEMA_NAME.equals(field.schema().name()) || Timestamp.SCHEMA_NAME.equals(field.schema().name()))) { builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA); } else { builder.field(field.name(), field.schema()); } } return builder.build(); } protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { return record.newRecord( record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp() ); } private R applyWithSchema(R r) { final Struct struct = requireStruct(operatingValue(r), PURPOSE); Schema updatedSchema = schemaUpdateCache.get(struct.schema()); if(updatedSchema == null) { updatedSchema = makeUpdatedSchema(struct.schema()); schemaUpdateCache.put(struct.schema(), updatedSchema); } final Struct updatedValue = new Struct(updatedSchema); for (Field field : struct.schema().fields()) { if (field.schema().type() != Schema.Type.STRING && field.schema().name() != null) { switch (field.schema().name()) { case Date.SCHEMA_NAME: Object value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Integer) { updatedValue.put(field.name(), formatDate((Integer)value)); } else { updatedValue.put(field.name(), value); } break; case Time.SCHEMA_NAME: value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Integer) { updatedValue.put(field.name(), formatTime((Integer)value)); } else { updatedValue.put(field.name(), value); } break; case MicroTime.SCHEMA_NAME: value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Long) { updatedValue.put(field.name(), formatMicroTime((Long)value)); } else { updatedValue.put(field.name(), value); } break; case Timestamp.SCHEMA_NAME: value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Long) { updatedValue.put(field.name(), formatTimestamp((Long)value)); } else { updatedValue.put(field.name(), value); } break; case MicroTimestamp.SCHEMA_NAME: value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Long) { updatedValue.put(field.name(), formatMicroTimestamp((Long)value)); } else { updatedValue.put(field.name(), value); } break; } } else { updatedValue.put(field.name(), struct.get(field)); } } return newRecord(r, updatedSchema, updatedValue); } @Override public R apply(R record) { if (operatingSchema(record) == null) { return record; } else { return applyWithSchema(record); } } }{code} was: I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings. Say I have the following table: {code:java} CREATE TABLE pk_created_at ( created_at timestamp without time zone DEFAULT current_timestamp not null, PRIMARY KEY (created_at) ); insert into pk_created_at values(current_timestamp); {code} {{}} {{}} My source connector configuration: {code:java} { "name": "test-connector", "config": { "snapshot.mode": "always", "plugin.name": "pgoutput", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "source", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test", "database.server.name": "test", "slot.name" : "test", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enabled": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enabled": true, "decimal.handling.mode": "string", "time.precision.mode": "adaptive", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } {code} {{}} {{}} And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be: {{}} {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code} {{}} After applying my SMT, the messages would be like this: {{}} {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code} {{ }} It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below: {{}} {code:java} 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 Call getNextException to see other errors in the batch. at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.postgresql.util.PSQLException: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) ... 17 more{code} The error seems like the sink connector was still trying to insert `{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I verified that the messages in the kafka topic have been transformed into strings. It worked if `{{{}created_at`{}}} is not a primary key. [My SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]: {code:java} public class DebeziumTimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> { private static final Logger LOG = LoggerFactory.getLogger(DebeziumTimestampConverter.class); private Cache<Schema, Schema> schemaUpdateCache; private static final String PURPOSE = "convert io.debezium.time.MicroTimestamp into String"; @Override public void configure(Map<String, ?> props) { schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16)); } @Override public ConfigDef config() { return new ConfigDef(); } @Override public void close() { } protected Schema operatingSchema(R record) { return record.valueSchema(); } protected Object operatingValue(R record) { return record.value(); } private String formatDate(Integer epoch) { if (epoch == null) { return ""; } LocalDate d = LocalDate.ofEpochDay(epoch); return d.toString(); } private String formatTime(Integer epoch) { if (epoch == null) { return ""; } java.util.Date date = new java.util.Date(epoch); return new SimpleDateFormat("HH:mm:ss.SSS").format(date); } private String formatMicroTime(Long epochMicroSeconds) { if (epochMicroSeconds == null) { return ""; } DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC)); long epochSeconds = epochMicroSeconds / 1000000L; long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ; Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset ); return formatter.format(instant); } private String formatTimestamp(Long epochMilliSeconds) { if (epochMilliSeconds == null) { return ""; } DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC)); Instant instant = Instant.ofEpochMilli( epochMilliSeconds ); return formatter.format(instant); } private String formatMicroTimestamp(Long epochMicroSeconds) { if (epochMicroSeconds == null) { return ""; } long epochSeconds = epochMicroSeconds / 1000000L; long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ; Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset ); return instant.toString(); } private Schema makeUpdatedSchema(Schema schema) { final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); for (Field field: schema.fields()) { if (field.schema().type() != Schema.Type.STRING && ( MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) || Date.SCHEMA_NAME.equals(field.schema().name()) || Time.SCHEMA_NAME.equals(field.schema().name()) || MicroTime.SCHEMA_NAME.equals(field.schema().name()) || Timestamp.SCHEMA_NAME.equals(field.schema().name()))) { builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA); } else { builder.field(field.name(), field.schema()); } } return builder.build(); } protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { return record.newRecord( record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp() ); } private R applyWithSchema(R r) { final Struct struct = requireStruct(operatingValue(r), PURPOSE); Schema updatedSchema = schemaUpdateCache.get(struct.schema()); if(updatedSchema == null) { updatedSchema = makeUpdatedSchema(struct.schema()); schemaUpdateCache.put(struct.schema(), updatedSchema); } final Struct updatedValue = new Struct(updatedSchema); for (Field field : struct.schema().fields()) { if (field.schema().type() != Schema.Type.STRING && field.schema().name() != null) { switch (field.schema().name()) { case Date.SCHEMA_NAME: Object value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Integer) { updatedValue.put(field.name(), formatDate((Integer)value)); } else { updatedValue.put(field.name(), value); } break; case Time.SCHEMA_NAME: value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Integer) { updatedValue.put(field.name(), formatTime((Integer)value)); } else { updatedValue.put(field.name(), value); } break; case MicroTime.SCHEMA_NAME: value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Long) { updatedValue.put(field.name(), formatMicroTime((Long)value)); } else { updatedValue.put(field.name(), value); } break; case Timestamp.SCHEMA_NAME: value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Long) { updatedValue.put(field.name(), formatTimestamp((Long)value)); } else { updatedValue.put(field.name(), value); } break; case MicroTimestamp.SCHEMA_NAME: value = struct.get(field); if (value == null) { updatedValue.put(field.name(), null); continue; } if (value instanceof Long) { updatedValue.put(field.name(), formatMicroTimestamp((Long)value)); } else { updatedValue.put(field.name(), value); } break; } } else { updatedValue.put(field.name(), struct.get(field)); } } return newRecord(r, updatedSchema, updatedValue); } @Override public R apply(R record) { if (operatingSchema(record) == null) { return record; } else { return applyWithSchema(record); } } }{code} > my custom SMT that converts Int to String does not work for primary keys > ------------------------------------------------------------------------ > > Key: KAFKA-14665 > URL: https://issues.apache.org/jira/browse/KAFKA-14665 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 3.1.0 > Reporter: Fuxin Hao > Priority: Major > > I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and > {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two > PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in > [Debezium > config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. > which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} > and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to > transform these data from numeric types to strings. > > Say I have the following table: > {code:java} > CREATE TABLE pk_created_at ( > created_at timestamp without time zone DEFAULT current_timestamp not null, > PRIMARY KEY (created_at) > ); > insert into pk_created_at values(current_timestamp); {code} > > My source connector configuration: > {code:java} > { > "name": "test-connector", > "config": { > "snapshot.mode": "always", > "plugin.name": "pgoutput", > "connector.class": > "io.debezium.connector.postgresql.PostgresConnector", > "tasks.max": "1", > "database.hostname": "source", > "database.port": "5432", > "database.user": "postgres", > "database.password": "postgres", > "database.dbname" : "test", > "database.server.name": "test", > "slot.name" : "test", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enabled": true, > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter.schemas.enabled": true, > "decimal.handling.mode": "string", > "time.precision.mode": "adaptive", > "transforms": "unwrap", > "transforms.unwrap.type": > "io.debezium.transforms.ExtractNewRecordState" > } > } {code} > > And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be: > {code:java} > # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic > test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", > "fields":[ { "type":"int64", "optional":false, > "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } > ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ > "created_at":1669354751764130 } }{code} > > After applying my SMT, the messages would be like this: > {code:java} > # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic > test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", > "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], > "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ > "created_at":"2022-11-25T05:39:11.764130Z" } }{code} > {{ }} > It worke[d great if > |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} > is not a part of primary keys. No error occurred. But the primary keys on > some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like > this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in > `{{{}JdbcSinkConnector`{}}} as below: > {code:java} > 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to > PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] > 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 > bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] > 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected > [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || > Checking PostgreSql dialect for existence of TABLE "pk_created_at" > [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 > 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present > [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 > 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE > "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] > 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to > Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', > isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} > [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 > WARN || Write of 2 records failed, remainingRetries=0 > [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: > Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES > (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: > column "created_at" is of type timestamp without time zone but expression is > of type bigint Hint: You will need to rewrite or cast the expression. > Position: 52 Call getNextException to see other errors in the batch. at > org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) > at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) at > org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638) > at > io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) > at > io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) > at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) > at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) Caused by: > org.postgresql.util.PSQLException: ERROR: column "created_at" is of type > timestamp without time zone but expression is of type bigint Hint: You will > need to rewrite or cast the expression. Position: 52 at > org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675) > at > org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365) > at > org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) > at > org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) > at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) > ... 17 more{code} > > The error seems like the sink connector was still trying to insert > `{{{}created_at`{}}} with a numeric `{{{}1669359291990398`{}}}. but I > verified that the messages in the kafka topic have been transformed into > strings. It worked if `{{{}created_at`{}}} is not a primary key. > > [My > SMT|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]: > > {code:java} > public class DebeziumTimestampConverter<R extends ConnectRecord<R>> > implements Transformation<R> { > private static final Logger LOG = > LoggerFactory.getLogger(DebeziumTimestampConverter.class); > private Cache<Schema, Schema> schemaUpdateCache; > private static final String PURPOSE = "convert > io.debezium.time.MicroTimestamp into String"; > @Override > public void configure(Map<String, ?> props) { > schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, > Schema>(16)); > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > @Override > public void close() { > } > protected Schema operatingSchema(R record) { > return record.valueSchema(); > } > protected Object operatingValue(R record) { > return record.value(); > } > private String formatDate(Integer epoch) { > if (epoch == null) { > return ""; > } > LocalDate d = LocalDate.ofEpochDay(epoch); > return d.toString(); > } > private String formatTime(Integer epoch) { > if (epoch == null) { > return ""; > } > java.util.Date date = new java.util.Date(epoch); > return new SimpleDateFormat("HH:mm:ss.SSS").format(date); > } > private String formatMicroTime(Long epochMicroSeconds) { > if (epochMicroSeconds == null) { > return ""; > } > DateTimeFormatter formatter = > DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneId.from(ZoneOffset.UTC)); > long epochSeconds = epochMicroSeconds / 1000000L; > long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ; > Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset ); > return formatter.format(instant); > } > private String formatTimestamp(Long epochMilliSeconds) { > if (epochMilliSeconds == null) { > return ""; > } > DateTimeFormatter formatter = > DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.from(ZoneOffset.UTC)); > Instant instant = Instant.ofEpochMilli( epochMilliSeconds ); > return formatter.format(instant); > } > private String formatMicroTimestamp(Long epochMicroSeconds) { > if (epochMicroSeconds == null) { > return ""; > } > long epochSeconds = epochMicroSeconds / 1000000L; > long nanoOffset = ( epochMicroSeconds % 1000000L ) * 1000L ; > Instant instant = Instant.ofEpochSecond( epochSeconds, nanoOffset ); > return instant.toString(); > } > private Schema makeUpdatedSchema(Schema schema) { > final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, > SchemaBuilder.struct()); > for (Field field: schema.fields()) { > if (field.schema().type() != Schema.Type.STRING && ( > MicroTimestamp.SCHEMA_NAME.equals(field.schema().name()) > || > Date.SCHEMA_NAME.equals(field.schema().name()) || > Time.SCHEMA_NAME.equals(field.schema().name()) || > MicroTime.SCHEMA_NAME.equals(field.schema().name()) || > Timestamp.SCHEMA_NAME.equals(field.schema().name()))) { > builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA); > } else { > builder.field(field.name(), field.schema()); > } > } > return builder.build(); > } > protected R newRecord(R record, Schema updatedSchema, Object > updatedValue) { > return record.newRecord( > record.topic(), > record.kafkaPartition(), > record.keySchema(), > record.key(), > updatedSchema, > updatedValue, > record.timestamp() > ); > } > private R applyWithSchema(R r) { > final Struct struct = requireStruct(operatingValue(r), PURPOSE); > Schema updatedSchema = schemaUpdateCache.get(struct.schema()); > if(updatedSchema == null) { > updatedSchema = makeUpdatedSchema(struct.schema()); > schemaUpdateCache.put(struct.schema(), updatedSchema); > } > final Struct updatedValue = new Struct(updatedSchema); > for (Field field : struct.schema().fields()) { > if (field.schema().type() != Schema.Type.STRING && > field.schema().name() != null) { > switch (field.schema().name()) { > case Date.SCHEMA_NAME: > Object value = struct.get(field); > if (value == null) { > updatedValue.put(field.name(), null); > continue; > } > if (value instanceof Integer) { > updatedValue.put(field.name(), > formatDate((Integer)value)); > } else { > updatedValue.put(field.name(), value); > } > break; > case Time.SCHEMA_NAME: > value = struct.get(field); > if (value == null) { > updatedValue.put(field.name(), null); > continue; > } > if (value instanceof Integer) { > updatedValue.put(field.name(), > formatTime((Integer)value)); > } else { > updatedValue.put(field.name(), value); > } > break; > case MicroTime.SCHEMA_NAME: > value = struct.get(field); > if (value == null) { > updatedValue.put(field.name(), null); > continue; > } > if (value instanceof Long) { > updatedValue.put(field.name(), > formatMicroTime((Long)value)); > } else { > updatedValue.put(field.name(), value); > } > break; > case Timestamp.SCHEMA_NAME: > value = struct.get(field); > if (value == null) { > updatedValue.put(field.name(), null); > continue; > } > if (value instanceof Long) { > updatedValue.put(field.name(), > formatTimestamp((Long)value)); > } else { > updatedValue.put(field.name(), value); > } > break; > case MicroTimestamp.SCHEMA_NAME: > value = struct.get(field); > if (value == null) { > updatedValue.put(field.name(), null); > continue; > } > if (value instanceof Long) { > updatedValue.put(field.name(), > formatMicroTimestamp((Long)value)); > } else { > updatedValue.put(field.name(), value); > } > break; > } > } else { > updatedValue.put(field.name(), struct.get(field)); > } > } > return newRecord(r, updatedSchema, updatedValue); > } > @Override > public R apply(R record) { > if (operatingSchema(record) == null) { > return record; > } else { > return applyWithSchema(record); > } > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)