This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5828a7def5 [flink][cdc] Add support for retry cnt instead of busy wait
and additional support to skip corrupt records in cdc writer (#4295)
5828a7def5 is described below
commit 5828a7def5a8270b04a9efa971814c6b0457d76e
Author: Ashish Khatkar <[email protected]>
AuthorDate: Mon Dec 23 08:44:47 2024 +0000
[flink][cdc] Add support for retry cnt instead of busy wait and additional
support to skip corrupt records in cdc writer (#4295)
---
.../sink/cdc/CdcDynamicBucketWriteOperator.java | 26 +++++++++++++---
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 23 +++++++++++---
.../sink/cdc/CdcRecordStoreWriteOperator.java | 36 +++++++++++++++++++---
3 files changed, 70 insertions(+), 15 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
index b0b135b361..5637e8b127 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
@@ -35,7 +35,9 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.io.IOException;
import java.util.Optional;
+import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
+import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
/**
@@ -47,6 +49,10 @@ public class CdcDynamicBucketWriteOperator extends
TableWriteOperator<Tuple2<Cdc
private final long retrySleepMillis;
+ private final int maxRetryNumTimes;
+
+ private final boolean skipCorruptRecord;
+
private CdcDynamicBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
@@ -55,6 +61,8 @@ public class CdcDynamicBucketWriteOperator extends
TableWriteOperator<Tuple2<Cdc
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
+ this.maxRetryNumTimes =
table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
+ this.skipCorruptRecord =
table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
}
@Override
@@ -73,7 +81,7 @@ public class CdcDynamicBucketWriteOperator extends
TableWriteOperator<Tuple2<Cdc
Tuple2<CdcRecord, Integer> record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record.f0,
table.schema().fields());
if (!optionalConverted.isPresent()) {
- while (true) {
+ for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record.f0,
table.schema().fields());
if (optionalConverted.isPresent()) {
@@ -84,10 +92,18 @@ public class CdcDynamicBucketWriteOperator extends
TableWriteOperator<Tuple2<Cdc
write.replace(table);
}
- try {
- write.write(optionalConverted.get(), record.f1);
- } catch (Exception e) {
- throw new IOException(e);
+ if (!optionalConverted.isPresent()) {
+ if (skipCorruptRecord) {
+ LOG.warn("Skipping corrupt or unparsable record {}", record);
+ } else {
+ throw new RuntimeException("Unable to process element.
Possibly a corrupt record");
+ }
+ } else {
+ try {
+ write.write(optionalConverted.get(), record.f1);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index a4b4e82840..a6c55c55f1 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -54,7 +54,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
+import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
/**
@@ -123,6 +125,9 @@ public class CdcRecordStoreMultiWriteOperator
FileStoreTable table = getTable(tableId);
+ int retryCnt =
table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
+ boolean skipCorruptRecord =
table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
+
// all table write should share one write buffer so that writers can
preempt memory
// from those of other tables
if (memoryPoolFactory == null) {
@@ -154,7 +159,7 @@ public class CdcRecordStoreMultiWriteOperator
toGenericRow(record.record(), table.schema().fields());
if (!optionalConverted.isPresent()) {
FileStoreTable latestTable = table;
- while (true) {
+ for (int retry = 0; retry < retryCnt; ++retry) {
latestTable = latestTable.copyWithLatestSchema();
tables.put(tableId, latestTable);
optionalConverted = toGenericRow(record.record(),
latestTable.schema().fields());
@@ -171,10 +176,18 @@ public class CdcRecordStoreMultiWriteOperator
write.replace(latestTable);
}
- try {
- write.write(optionalConverted.get());
- } catch (Exception e) {
- throw new IOException("Exception occurs for writing record to
table: " + tableId, e);
+ if (!optionalConverted.isPresent()) {
+ if (skipCorruptRecord) {
+ LOG.warn("Skipping corrupt or unparsable record {}", record);
+ } else {
+ throw new RuntimeException("Unable to process element.
Possibly a corrupt record");
+ }
+ } else {
+ try {
+ write.write(optionalConverted.get());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
index 195e683daa..8a8233842d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
@@ -52,8 +52,24 @@ public class CdcRecordStoreWriteOperator extends
TableWriteOperator<CdcRecord> {
.durationType()
.defaultValue(Duration.ofMillis(500));
+ public static final ConfigOption<Integer> MAX_RETRY_NUM_TIMES =
+ ConfigOptions.key("cdc.max-retry-num-times")
+ .intType()
+ .defaultValue(100)
+ .withDescription("Max retry count for updating table
before failing loudly");
+
+ public static final ConfigOption<Boolean> SKIP_CORRUPT_RECORD =
+ ConfigOptions.key("cdc.skip-corrupt-record")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Skip corrupt record if we fail to parse
it");
+
private final long retrySleepMillis;
+ private final int maxRetryNumTimes;
+
+ private final boolean skipCorruptRecord;
+
protected CdcRecordStoreWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
@@ -62,6 +78,8 @@ public class CdcRecordStoreWriteOperator extends
TableWriteOperator<CdcRecord> {
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
+ this.maxRetryNumTimes =
table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
+ this.skipCorruptRecord =
table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
}
@Override
@@ -80,7 +98,7 @@ public class CdcRecordStoreWriteOperator extends
TableWriteOperator<CdcRecord> {
CdcRecord record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record,
table.schema().fields());
if (!optionalConverted.isPresent()) {
- while (true) {
+ for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record,
table.schema().fields());
if (optionalConverted.isPresent()) {
@@ -91,10 +109,18 @@ public class CdcRecordStoreWriteOperator extends
TableWriteOperator<CdcRecord> {
write.replace(table);
}
- try {
- write.write(optionalConverted.get());
- } catch (Exception e) {
- throw new IOException(e);
+ if (!optionalConverted.isPresent()) {
+ if (skipCorruptRecord) {
+ LOG.warn("Skipping corrupt or unparsable record {}", record);
+ } else {
+ throw new RuntimeException("Unable to process element.
Possibly a corrupt record");
+ }
+ } else {
+ try {
+ write.write(optionalConverted.get());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}