This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5828a7def5 [flink][cdc] Add support for retry cnt instead of busy wait 
and additional support to skip corrupt records in cdc writer (#4295)
5828a7def5 is described below

commit 5828a7def5a8270b04a9efa971814c6b0457d76e
Author: Ashish Khatkar <[email protected]>
AuthorDate: Mon Dec 23 08:44:47 2024 +0000

    [flink][cdc] Add support for retry cnt instead of busy wait and additional 
support to skip corrupt records in cdc writer (#4295)
---
 .../sink/cdc/CdcDynamicBucketWriteOperator.java    | 26 +++++++++++++---
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 23 +++++++++++---
 .../sink/cdc/CdcRecordStoreWriteOperator.java      | 36 +++++++++++++++++++---
 3 files changed, 70 insertions(+), 15 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
index b0b135b361..5637e8b127 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
@@ -35,7 +35,9 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import java.io.IOException;
 import java.util.Optional;
 
+import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
 import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
+import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
 import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
 
 /**
@@ -47,6 +49,10 @@ public class CdcDynamicBucketWriteOperator extends 
TableWriteOperator<Tuple2<Cdc
 
     private final long retrySleepMillis;
 
+    private final int maxRetryNumTimes;
+
+    private final boolean skipCorruptRecord;
+
     private CdcDynamicBucketWriteOperator(
             StreamOperatorParameters<Committable> parameters,
             FileStoreTable table,
@@ -55,6 +61,8 @@ public class CdcDynamicBucketWriteOperator extends 
TableWriteOperator<Tuple2<Cdc
         super(parameters, table, storeSinkWriteProvider, initialCommitUser);
         this.retrySleepMillis =
                 
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
+        this.maxRetryNumTimes = 
table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
+        this.skipCorruptRecord = 
table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
     }
 
     @Override
@@ -73,7 +81,7 @@ public class CdcDynamicBucketWriteOperator extends 
TableWriteOperator<Tuple2<Cdc
         Tuple2<CdcRecord, Integer> record = element.getValue();
         Optional<GenericRow> optionalConverted = toGenericRow(record.f0, 
table.schema().fields());
         if (!optionalConverted.isPresent()) {
-            while (true) {
+            for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
                 table = table.copyWithLatestSchema();
                 optionalConverted = toGenericRow(record.f0, 
table.schema().fields());
                 if (optionalConverted.isPresent()) {
@@ -84,10 +92,18 @@ public class CdcDynamicBucketWriteOperator extends 
TableWriteOperator<Tuple2<Cdc
             write.replace(table);
         }
 
-        try {
-            write.write(optionalConverted.get(), record.f1);
-        } catch (Exception e) {
-            throw new IOException(e);
+        if (!optionalConverted.isPresent()) {
+            if (skipCorruptRecord) {
+                LOG.warn("Skipping corrupt or unparsable record {}", record);
+            } else {
+                throw new RuntimeException("Unable to process element. 
Possibly a corrupt record");
+            }
+        } else {
+            try {
+                write.write(optionalConverted.get(), record.f1);
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index a4b4e82840..a6c55c55f1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -54,7 +54,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
 import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
+import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
 import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
 
 /**
@@ -123,6 +125,9 @@ public class CdcRecordStoreMultiWriteOperator
 
         FileStoreTable table = getTable(tableId);
 
+        int retryCnt = 
table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
+        boolean skipCorruptRecord = 
table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
+
         // all table write should share one write buffer so that writers can 
preempt memory
         // from those of other tables
         if (memoryPoolFactory == null) {
@@ -154,7 +159,7 @@ public class CdcRecordStoreMultiWriteOperator
                 toGenericRow(record.record(), table.schema().fields());
         if (!optionalConverted.isPresent()) {
             FileStoreTable latestTable = table;
-            while (true) {
+            for (int retry = 0; retry < retryCnt; ++retry) {
                 latestTable = latestTable.copyWithLatestSchema();
                 tables.put(tableId, latestTable);
                 optionalConverted = toGenericRow(record.record(), 
latestTable.schema().fields());
@@ -171,10 +176,18 @@ public class CdcRecordStoreMultiWriteOperator
             write.replace(latestTable);
         }
 
-        try {
-            write.write(optionalConverted.get());
-        } catch (Exception e) {
-            throw new IOException("Exception occurs for writing record to 
table: " + tableId, e);
+        if (!optionalConverted.isPresent()) {
+            if (skipCorruptRecord) {
+                LOG.warn("Skipping corrupt or unparsable record {}", record);
+            } else {
+                throw new RuntimeException("Unable to process element. 
Possibly a corrupt record");
+            }
+        } else {
+            try {
+                write.write(optionalConverted.get());
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
index 195e683daa..8a8233842d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
@@ -52,8 +52,24 @@ public class CdcRecordStoreWriteOperator extends 
TableWriteOperator<CdcRecord> {
                     .durationType()
                     .defaultValue(Duration.ofMillis(500));
 
+    public static final ConfigOption<Integer> MAX_RETRY_NUM_TIMES =
+            ConfigOptions.key("cdc.max-retry-num-times")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("Max retry count for updating table 
before failing loudly");
+
+    public static final ConfigOption<Boolean> SKIP_CORRUPT_RECORD =
+            ConfigOptions.key("cdc.skip-corrupt-record")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Skip corrupt record if we fail to parse 
it");
+
     private final long retrySleepMillis;
 
+    private final int maxRetryNumTimes;
+
+    private final boolean skipCorruptRecord;
+
     protected CdcRecordStoreWriteOperator(
             StreamOperatorParameters<Committable> parameters,
             FileStoreTable table,
@@ -62,6 +78,8 @@ public class CdcRecordStoreWriteOperator extends 
TableWriteOperator<CdcRecord> {
         super(parameters, table, storeSinkWriteProvider, initialCommitUser);
         this.retrySleepMillis =
                 
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
+        this.maxRetryNumTimes = 
table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
+        this.skipCorruptRecord = 
table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
     }
 
     @Override
@@ -80,7 +98,7 @@ public class CdcRecordStoreWriteOperator extends 
TableWriteOperator<CdcRecord> {
         CdcRecord record = element.getValue();
         Optional<GenericRow> optionalConverted = toGenericRow(record, 
table.schema().fields());
         if (!optionalConverted.isPresent()) {
-            while (true) {
+            for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
                 table = table.copyWithLatestSchema();
                 optionalConverted = toGenericRow(record, 
table.schema().fields());
                 if (optionalConverted.isPresent()) {
@@ -91,10 +109,18 @@ public class CdcRecordStoreWriteOperator extends 
TableWriteOperator<CdcRecord> {
             write.replace(table);
         }
 
-        try {
-            write.write(optionalConverted.get());
-        } catch (Exception e) {
-            throw new IOException(e);
+        if (!optionalConverted.isPresent()) {
+            if (skipCorruptRecord) {
+                LOG.warn("Skipping corrupt or unparsable record {}", record);
+            } else {
+                throw new RuntimeException("Unable to process element. 
Possibly a corrupt record");
+            }
+        } else {
+            try {
+                write.write(optionalConverted.get());
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
         }
     }
 

Reply via email to