This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new e59a17006 [FLINK-37352][pipeline-connector/paimon] Supports write full
change log to Paimon table
e59a17006 is described below
commit e59a170064051051088c3275fe44b208fbd5763a
Author: Kunni <[email protected]>
AuthorDate: Mon Mar 10 15:37:02 2025 +0800
[FLINK-37352][pipeline-connector/paimon] Supports write full change log to
Paimon table
This closes #3935
---
.../cdc/connectors/paimon/sink/v2/PaimonEvent.java | 30 +++++++++------
.../sink/v2/PaimonRecordEventSerializer.java | 7 ++--
.../connectors/paimon/sink/v2/PaimonWriter.java | 7 +++-
.../paimon/sink/v2/PaimonWriterHelper.java | 45 ++++++++++++++++++++++
.../paimon/sink/v2/PaimonSinkITCase.java | 13 +++++--
5 files changed, 81 insertions(+), 21 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
index d23ca7e76..1ac14698e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
@@ -20,35 +20,41 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
+import java.util.List;
+
/** Contains the data to be written for {@link PaimonWriter}. */
public class PaimonEvent {
// Identifier for the Paimon table to be written.
Identifier tableId;
- // The actual record to be written to Paimon table.
- GenericRow genericRow;
+ // The actual records to be written to Paimon table, contains full
changelog(before/after).
+ List<GenericRow> genericRows;
// if true, means that table schema has changed right before this
genericRow.
boolean shouldRefreshSchema;
int bucket;
- public PaimonEvent(Identifier tableId, GenericRow genericRow) {
+ public PaimonEvent(Identifier tableId, List<GenericRow> genericRows) {
this.tableId = tableId;
- this.genericRow = genericRow;
+ this.genericRows = genericRows;
this.shouldRefreshSchema = false;
}
- public PaimonEvent(Identifier tableId, GenericRow genericRow, boolean
shouldRefreshSchema) {
+ public PaimonEvent(
+ Identifier tableId, List<GenericRow> genericRows, boolean
shouldRefreshSchema) {
this.tableId = tableId;
- this.genericRow = genericRow;
+ this.genericRows = genericRows;
this.shouldRefreshSchema = shouldRefreshSchema;
}
public PaimonEvent(
- Identifier tableId, GenericRow genericRow, boolean
shouldRefreshSchema, int bucket) {
+ Identifier tableId,
+ List<GenericRow> genericRows,
+ boolean shouldRefreshSchema,
+ int bucket) {
this.tableId = tableId;
- this.genericRow = genericRow;
+ this.genericRows = genericRows;
this.shouldRefreshSchema = shouldRefreshSchema;
this.bucket = bucket;
}
@@ -69,12 +75,12 @@ public class PaimonEvent {
this.shouldRefreshSchema = shouldRefreshSchema;
}
- public GenericRow getGenericRow() {
- return genericRow;
+ public List<GenericRow> getGenericRows() {
+ return genericRows;
}
- public void setGenericRow(GenericRow genericRow) {
- this.genericRow = genericRow;
+ public void setGenericRows(List<GenericRow> genericRows) {
+ this.genericRows = genericRows;
}
public int getBucket() {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
index 983a680bd..d0eceabcd 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
@@ -32,6 +32,7 @@ import org.apache.paimon.data.GenericRow;
import java.time.ZoneId;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -79,11 +80,11 @@ public class PaimonRecordEventSerializer implements
PaimonRecordSerializer<Event
return new PaimonEvent(tableId, null, true);
} else if (event instanceof DataChangeEvent) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
- GenericRow genericRow =
- PaimonWriterHelper.convertEventToGenericRow(
+ List<GenericRow> genericRows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
- return new PaimonEvent(tableId, genericRow, false, bucket);
+ return new PaimonEvent(tableId, genericRows, false, bucket);
} else {
throw new IllegalArgumentException(
"failed to convert Input into PaimonEvent, unsupported
event: " + event);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index fcf522468..d1c3fa561 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -27,6 +27,7 @@ import
org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.StoreSinkWrite;
@@ -126,7 +127,7 @@ public class PaimonWriter<InputT>
throw new RuntimeException(e);
}
}
- if (paimonEvent.getGenericRow() != null) {
+ if (paimonEvent.getGenericRows() != null) {
FileStoreTable table;
table = getTable(tableId);
if (memoryPoolFactory == null) {
@@ -155,7 +156,9 @@ public class PaimonWriter<InputT>
return storeSinkWrite;
});
try {
- write.write(paimonEvent.getGenericRow(),
paimonEvent.getBucket());
+ for (GenericRow genericRow : paimonEvent.getGenericRows()) {
+ write.write(genericRow, paimonEvent.getBucket());
+ }
} catch (Exception e) {
throw new IOException(e);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
index 986125780..36bb9fbbc 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
@@ -179,6 +179,51 @@ public class PaimonWriterHelper {
return genericRow;
}
+ /** create full {@link GenericRow}s from a {@link DataChangeEvent} for
{@link PaimonWriter}. */
+ public static List<GenericRow> convertEventToFullGenericRows(
+ DataChangeEvent dataChangeEvent, List<RecordData.FieldGetter>
fieldGetters) {
+ List<GenericRow> fullGenericRows = new ArrayList<>();
+ switch (dataChangeEvent.op()) {
+ case INSERT:
+ {
+ fullGenericRows.add(
+ convertRecordDataToGenericRow(
+ dataChangeEvent.after(), fieldGetters,
RowKind.INSERT));
+ break;
+ }
+ case UPDATE:
+ case REPLACE:
+ {
+ fullGenericRows.add(
+ convertRecordDataToGenericRow(
+ dataChangeEvent.before(), fieldGetters,
RowKind.UPDATE_BEFORE));
+ fullGenericRows.add(
+ convertRecordDataToGenericRow(
+ dataChangeEvent.after(), fieldGetters,
RowKind.UPDATE_AFTER));
+ break;
+ }
+ case DELETE:
+ {
+ fullGenericRows.add(
+ convertRecordDataToGenericRow(
+ dataChangeEvent.before(), fieldGetters,
RowKind.DELETE));
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("don't support type of " +
dataChangeEvent.op());
+ }
+ return fullGenericRows;
+ }
+
+ private static GenericRow convertRecordDataToGenericRow(
+ RecordData recordData, List<RecordData.FieldGetter> fieldGetters,
RowKind rowKind) {
+ GenericRow genericRow = new GenericRow(rowKind, recordData.getArity());
+ for (int i = 0; i < recordData.getArity(); i++) {
+ genericRow.setField(i,
fieldGetters.get(i).getFieldOrNull(recordData));
+ }
+ return genericRow;
+ }
+
/** A helper class for {@link PaimonWriter} to create FieldGetter and
GenericRow. */
public static class BinaryFieldDataGetter implements
RecordData.FieldGetter {
private final int fieldPos;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index bc257d0db..adb43483d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -209,19 +209,24 @@ public class PaimonSinkITCase {
table1,
Arrays.asList(Tuple2.of(STRING(), "2"),
Tuple2.of(STRING(), "2")),
Arrays.asList(Tuple2.of(STRING(), "2"),
Tuple2.of(STRING(), "x"))));
- Assertions.assertThat(fetchResults(table1))
- .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2",
"x"));
+ if (enableDeleteVector) {
+ Assertions.assertThat(fetchResults(table1))
+ .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2",
"x"));
+ } else {
+ Assertions.assertThat(fetchResults(table1))
+
.containsExactlyInAnyOrder(Row.ofKind(RowKind.UPDATE_AFTER, "2", "x"));
+ }
if (enableDeleteVector) {
Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName()))
.containsExactlyInAnyOrder(
- Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 3L));
+ Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 4L));
} else {
Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName()))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 2L),
- Row.ofKind(RowKind.INSERT, 3L));
+ Row.ofKind(RowKind.INSERT, 4L));
}
}