yulongz opened a new issue, #6519:
URL: https://github.com/apache/iceberg/issues/6519

   ### Apache Iceberg version
   
   None
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   Iceberg table have two discontinuous primary keys(`name`,`partition_key`) 
and primary keys type is varchar/StringData, other columns type is int/double. 
   ```
   CREATE TABLE IF NOT EXISTS default_catalog.default_database.aaa (
   `name` varchar,
   `class` int,
   `age` int,
   `score` double,
   `partition_key` varchar,
   PRIMARY KEY (`name`,`partition_key`) NOT ENFORCED)
   WITH (
   'write.upsert.enabled'='true',
   'write.metadata.delete-after-commit.enabled'='true',
   'write.metadata.previous-versions-max'='10',
   'commit.manifest.min-count-to-merge'='5',
   'table.dynamic-table-options.enabled'='true',
   'format-version'='2');
   ```
   When i use flink cdc and send a '-D' kind row in upsert mode, 
IcebergStreamWriter throw ClassCastException .
   `BaseDeltaTaskWriter write(), Row Kind: -D , Row: -D(a1,1,33,77.0,b)`
   Exception log as below,
   ```
   2023-01-03 15:00:35,653 WARN  org.apache.flink.runtime.taskmanager.Task      
              
    - Source: TableSourceScan(table=[[default_catalog, default_database, aaa]], 
fields=[name, class, age, score, partition_key]) 
    -> NotNullEnforcer(fields=[name, partition_key]) 
    -> IcebergStreamWriter (1/1)#0 (5a04dd71dcf69ba92a2cbc1844b359d2) switched 
from RUNNING to FAILED with failure cause:
    java.lang.ClassCastException: java.lang.Integer cannot be cast to 
org.apache.flink.table.data.StringData
        at 
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
        at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:228)
        at 
org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:469)
        at 
org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:453)
        at 
org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:566)
        at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:130)
        at 
org.apache.iceberg.deletes.EqualityDeleteWriter.delete(EqualityDeleteWriter.java:82)
        at 
org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:368)
        at 
org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:351)
        at 
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:263)
        at 
org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.delete(BaseTaskWriter.java:173)
        at 
org.apache.iceberg.flink.sink.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:105)
        at 
org.apache.iceberg.flink.sink.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:43)
        at 
org.apache.iceberg.flink.sink.IcebergStreamWriter.processElement(IcebergStreamWriter.java:72)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
        at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
        at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
        at 
com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157)
        at 
com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:129)
        at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
        at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
        at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.lang.Thread.run(Thread.java:748)
   ```
   Solution:  BaseDeltaTaskWriter.write() add deleteKey when DELETE in upsert 
mode.
   ```
     @Override
     public void write(RowData row) throws IOException {
       RowDataDeltaWriter writer = route(row);
   
       switch (row.getRowKind()) {
         case INSERT:
         case UPDATE_AFTER:
           if (upsert) {
             writer.deleteKey(keyProjection.wrap(row));
           }
           writer.write(row);
           break;
   
         case UPDATE_BEFORE:
           if (upsert) {
             break;  // UPDATE_BEFORE is not necessary for UPDATE, we do 
nothing to prevent delete one row twice
           }
           writer.delete(row);
           break;
         case DELETE:
           if (upsert) {
             writer.deleteKey(keyProjection.wrap(row));
           } else {
             writer.delete(row);
           }
           break;
   
         default:
           throw new UnsupportedOperationException("Unknown row kind: " + 
row.getRowKind());
       }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to