indiejames opened a new issue, #15342:
URL: https://github.com/apache/iceberg/issues/15342

   ### Query engine
   
   N/A
   
   ### Question
   
   How can I configure the Kafka Iceberg sink connector to avoid duplicates 
when doing updates or deletes when processing Debezium messages on Kafka? I am 
trying to use the Debezium transform with the Kafka Iceberg sink connector, 
which works for inserts, but it creates duplicate rows in my Iceberg V2 table 
for updates and deletes.  
   
   I have the following sink connection configuration:
   
   ```
   {
     "name": "ice",
     "config": {
       "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
       "tasks.max": "1",
       "topics": "dbserver1.public.executions",
       "iceberg.tables": "cumulus_iceberg.executions",
       "iceberg.control.topic": "control-iceberg",
       "iceberg.control.group.id": "iceberg-control-v10",
       "iceberg.control.commit.interval-ms": "30000",
       "iceberg.control.commit.timeout-ms": "60000",
   
       "iceberg.tables.upsert-mode-enabled": "true",
       "iceberg.tables.default-id-columns": "cumulus_id",
   
       "transforms": "debezium",
       "transforms.debezium.type": 
"org.apache.iceberg.connect.transforms.DebeziumTransform",
   
       "key.converter": "org.apache.kafka.connect.json.JsonConverter",
       "key.converter.schemas.enable": "false",
       "value.converter": "org.apache.kafka.connect.json.JsonConverter",
       "value.converter.schemas.enable": "false",
   
       "iceberg.catalog.catalog-impl": 
"org.apache.iceberg.aws.glue.GlueCatalog",
       "iceberg.catalog.warehouse": "s3a://my-warehouse-bucket",
       "iceberg.catalog.client.region": "us-east-1"
     }
   }
   ```
   
   This is a sample Kafka message created by Debezium:
   
   ```
   {
     "before": {
       "cumulus_id": 597053050,
       "arn": "arn92",
       "async_operation_cumulus_id": null,
       "collection_cumulus_id": 6,
       "parent_cumulus_id": null,
       "cumulus_version": null,
       "url": "http://example92.com";,
       "status": "completed",
       "tasks": "{}",
       "error": "{\"Cause\": \"None\", \"Error\": \"Unknown Error\"}",
       "workflow_name": "ICESat-2_Ingest_CISS",
       "duration": 51.517,
       "timestamp": "2021-06-27T19:52:01.611000Z",
       "created_at": "2021-06-27T19:51:09.664000Z",
       "updated_at": "2021-06-27T19:52:01.611000Z",
       "archived": true
     },
     "after": null,
     "source": {
       "version": "3.4.1.Final",
       "connector": "postgresql",
       "name": "dbserver1",
       "ts_ms": 1771274367820,
       "snapshot": "false",
       "db": "nsidc_cumulus_prod_db",
       "sequence": "[\"14634119792\",\"14634140008\"]",
       "ts_us": 1771274367820463,
       "ts_ns": 1771274367820463000,
       "schema": "public",
       "table": "executions",
       "txId": 2522768715,
       "lsn": 14634140008,
       "xmin": null
     },
     "transaction": null,
     "op": "d",
     "ts_ms": 1771274368136,
     "ts_us": 1771274368136554,
     "ts_ns": 1771274368136554920
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to