[jira] [Updated] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys

2023-01-30 Thread Fuxin Hao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fuxin Hao updated KAFKA-14665:
--
Description: 
I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and 
{{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two 
PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in 
[Debezium 
config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types].
 which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and 
it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to 
transform these data from numeric types to strings.

 

Say I have the following table:
{code:java}
CREATE TABLE pk_created_at (
created_at timestamp without time zone DEFAULT current_timestamp not null,
PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp); {code}
 

My source connector configuration:
{code:java}
{
"name": "test-connector",
"config": {
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "source",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "test",
"slot.name" : "test",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enabled": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enabled": true,
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
} {code}
 

And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"int64", "optional":false, 
"name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } 
], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":1669354751764130 } }{code}
 

After applying my SMT, the messages would be like this:
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"string", "optional":true, "field":"created_at" } ], 
"optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":"2022-11-25T05:39:11.764130Z" } }{code}
{{ }}

It worke[d great if 
|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}}
 is not a part of primary keys. No error occurred. But the primary keys on some 
of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: 
`{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in 
`{{{}JdbcSinkConnector`{}}} as below:
{code:java}
2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql 
[io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 
06:57:01,459 INFO || Maximum table name length for database is 63 bytes 
[io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 
06:57:01,459 INFO || JdbcDbWriter Connected 
[io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || 
Checking PostgreSql dialect for existence of TABLE "pk_created_at" 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE 
"pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 
2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to 
Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', 
isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} 
[io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN 
|| Write of 2 records failed, remainingRetries=0 
[io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: 
Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES 
(1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: 
column "created_at" is of type timestamp without time zone but expression is of 
type bigint Hint: You will need to rewrite or cast the expression. Position: 52 
Call getNextException to see other errors in the 

[jira] [Created] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys

2023-01-30 Thread Fuxin Hao (Jira)
Fuxin Hao created KAFKA-14665:
-

 Summary: my custom SMT that converts Int to String does not work 
for primary keys
 Key: KAFKA-14665
 URL: https://issues.apache.org/jira/browse/KAFKA-14665
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.1.0
Reporter: Fuxin Hao


I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and 
{{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two 
PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in 
[Debezium 
config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types].
 which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and 
it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to 
transform these data from numeric types to strings.

 

Say I have the following table:
{code:java}
CREATE TABLE pk_created_at (
created_at timestamp without time zone DEFAULT current_timestamp not null,
PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp); {code}
{{}}

{{}}

My source connector configuration:
{code:java}
{
"name": "test-connector",
"config": {
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "source",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "test",
"slot.name" : "test",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enabled": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enabled": true,
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
} {code}
{{}}

{{}}

 

And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:

{{}}
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"int64", "optional":false, 
"name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } 
], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":1669354751764130 } }{code}
 

{{}}

After applying my SMT, the messages would be like this:

{{}}
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"string", "optional":true, "field":"created_at" } ], 
"optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":"2022-11-25T05:39:11.764130Z" } }{code}
{{ }}

It worke[d great if 
|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}}
 is not a part of primary keys. No error occurred. But the primary keys on some 
of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: 
`{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in 
`{{{}JdbcSinkConnector`{}}} as below:

{{}}
{code:java}
2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql 
[io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 
06:57:01,459 INFO || Maximum table name length for database is 63 bytes 
[io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 
06:57:01,459 INFO || JdbcDbWriter Connected 
[io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || 
Checking PostgreSql dialect for existence of TABLE "pk_created_at" 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE 
"pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 
2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to 
Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', 
isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} 
[io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN 
|| Write of 2 records failed, remainingRetries=0 
[io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: 
Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES 
(1669359291990398) ON CONFLICT