This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a2e27291c [Feature][Connector-v2] Support streaming read for paimon
(#7681)
4a2e27291c is described below
commit 4a2e27291c4d16d36ff4edaf70a67242ff78e9bf
Author: dailai <[email protected]>
AuthorDate: Fri Sep 20 13:16:09 2024 +0800
[Feature][Connector-v2] Support streaming read for paimon (#7681)
---
docs/en/connector-v2/source/Paimon.md | 29 ++-
.../seatunnel/paimon/source/PaimonSource.java | 58 ++++-
.../paimon/source/PaimonSourceReader.java | 47 ++--
.../seatunnel/paimon/source/PaimonSourceSplit.java | 16 +-
.../paimon/source/PaimonSourceSplitEnumerator.java | 180 ---------------
.../paimon/source/PaimonSourceSplitGenerator.java | 60 +++++
.../seatunnel/paimon/source/PaimonSourceState.java | 19 +-
.../source/enumerator/AbstractSplitEnumerator.java | 249 +++++++++++++++++++++
.../PaimonBatchSourceSplitEnumerator.java | 63 ++++++
.../PaimonStreamSourceSplitEnumerator.java | 51 +++++
.../seatunnel/paimon/utils/RowKindConverter.java | 28 ++-
.../connector/paimon/PaimonRecordWithFullType.java | 49 ++++
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 2 +-
.../e2e/connector/paimon/PaimonStreamReadIT.java | 152 +++++++++++++
.../fake_to_paimon_with_full_type_cdc_data.conf | 79 +++++++
.../src/test/resources/paimon_to_paimon.conf | 38 ++++
16 files changed, 883 insertions(+), 237 deletions(-)
diff --git a/docs/en/connector-v2/source/Paimon.md
b/docs/en/connector-v2/source/Paimon.md
index 32155abde0..e586a4fd9d 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -9,7 +9,7 @@ Read data from Apache Paimon.
## Key features
- [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
@@ -157,9 +157,30 @@ source {
```
## Changelog
+If you want to read the changelog of this connector, your sink table of paimon
which mast has the options named `changelog-producer=input`, then you can refer
to [Paimon
changelog](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/).
+Currently, we only support the `input` and `none` mode of changelog producer.
If the changelog producer is `input`, the streaming read of the connector will
generate -U,+U,+I,+D data. But if the changelog producer is `none`, the
streaming read of the connector will generate +I,+U,+D data.
-### next version
+### Streaming read example
+```hocon
+env {
+ parallelism = 1
+ job.mode = "Streaming"
+}
-- Add Paimon Source Connector
-- Support projection for Paimon Source
+source {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test"
+ }
+}
+sink {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test_sink"
+ paimon.table.primary-keys = "c_tinyint"
+ }
+}
+```
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index d0a0c4a793..d5c31ff235 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.source;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -26,18 +27,23 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonBatchSourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonStreamSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.RowType;
import net.sf.jsqlparser.statement.select.PlainSelect;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@@ -58,12 +64,12 @@ public class PaimonSource
private Table paimonTable;
- private Predicate predicate;
-
- private int[] projectionIndex;
+ private JobContext jobContext;
private CatalogTable catalogTable;
+ protected final ReadBuilder readBuilder;
+
public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog
paimonCatalog) {
this.readonlyConfig = readonlyConfig;
PaimonSourceConfig paimonSourceConfig = new
PaimonSourceConfig(readonlyConfig);
@@ -76,17 +82,22 @@ public class PaimonSource
PlainSelect plainSelect = convertToPlainSelect(filterSql);
RowType paimonRowType = this.paimonTable.rowType();
String[] filedNames = paimonRowType.getFieldNames().toArray(new
String[0]);
+
+ Predicate predicate = null;
+ int[] projectionIndex = null;
if (!Objects.isNull(plainSelect)) {
- this.projectionIndex =
convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
+ projectionIndex =
convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
if (!Objects.isNull(projectionIndex)) {
this.catalogTable =
paimonCatalog.getTableWithProjection(tablePath,
projectionIndex);
}
- this.predicate =
+ predicate =
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
paimonRowType, plainSelect);
}
- seaTunnelRowType = RowTypeConverter.convert(paimonRowType,
projectionIndex);
+ this.seaTunnelRowType = RowTypeConverter.convert(paimonRowType,
projectionIndex);
+ this.readBuilder =
+
paimonTable.newReadBuilder().withProjection(projectionIndex).withFilter(predicate);
}
@Override
@@ -99,23 +110,34 @@ public class PaimonSource
return Collections.singletonList(catalogTable);
}
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ }
+
@Override
public Boundedness getBoundedness() {
- return Boundedness.BOUNDED;
+ return JobMode.BATCH.equals(jobContext.getJobMode())
+ ? Boundedness.BOUNDED
+ : Boundedness.UNBOUNDED;
}
@Override
public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
return new PaimonSourceReader(
- readerContext, paimonTable, seaTunnelRowType, predicate,
projectionIndex);
+ readerContext, paimonTable, seaTunnelRowType,
readBuilder.newRead());
}
@Override
public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState>
createEnumerator(
SourceSplitEnumerator.Context<PaimonSourceSplit>
enumeratorContext) throws Exception {
- return new PaimonSourceSplitEnumerator(
- enumeratorContext, paimonTable, predicate, projectionIndex);
+ if (getBoundedness() == Boundedness.BOUNDED) {
+ return new PaimonBatchSourceSplitEnumerator(
+ enumeratorContext, new LinkedList<>(), null,
readBuilder.newScan(), 1);
+ }
+ return new PaimonStreamSourceSplitEnumerator(
+ enumeratorContext, new LinkedList<>(), null,
readBuilder.newStreamScan(), 1);
}
@Override
@@ -123,7 +145,19 @@ public class PaimonSource
SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext,
PaimonSourceState checkpointState)
throws Exception {
- return new PaimonSourceSplitEnumerator(
- enumeratorContext, paimonTable, checkpointState, predicate,
projectionIndex);
+ if (getBoundedness() == Boundedness.BOUNDED) {
+ return new PaimonBatchSourceSplitEnumerator(
+ enumeratorContext,
+ checkpointState.getAssignedSplits(),
+ checkpointState.getCurrentSnapshotId(),
+ readBuilder.newScan(),
+ 1);
+ }
+ return new PaimonStreamSourceSplitEnumerator(
+ enumeratorContext,
+ checkpointState.getAssignedSplits(),
+ checkpointState.getCurrentSnapshotId(),
+ readBuilder.newStreamScan(),
+ 1);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index 3cfa5ee8b9..50de479b4b 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -17,18 +17,21 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.source;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowKindConverter;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.TableRead;
import lombok.extern.slf4j.Slf4j;
@@ -48,20 +51,14 @@ public class PaimonSourceReader implements
SourceReader<SeaTunnelRow, PaimonSour
private final Table table;
private final SeaTunnelRowType seaTunnelRowType;
private volatile boolean noMoreSplit;
- private final Predicate predicate;
- private int[] projection;
+ private final TableRead tableRead;
public PaimonSourceReader(
- Context context,
- Table table,
- SeaTunnelRowType seaTunnelRowType,
- Predicate predicate,
- int[] projection) {
+ Context context, Table table, SeaTunnelRowType seaTunnelRowType,
TableRead tableRead) {
this.context = context;
this.table = table;
this.seaTunnelRowType = seaTunnelRowType;
- this.predicate = predicate;
- this.projection = projection;
+ this.tableRead = tableRead;
}
@Override
@@ -81,12 +78,7 @@ public class PaimonSourceReader implements
SourceReader<SeaTunnelRow, PaimonSour
if (Objects.nonNull(split)) {
// read logic
try (final RecordReader<InternalRow> reader =
- table.newReadBuilder()
- .withProjection(projection)
- .withFilter(predicate)
- .newRead()
- .executeFilter()
- .createReader(split.getSplit())) {
+
tableRead.executeFilter().createReader(split.getSplit())) {
final RecordReaderIterator<InternalRow> rowIterator =
new RecordReaderIterator<>(reader);
while (rowIterator.hasNext()) {
@@ -94,16 +86,31 @@ public class PaimonSourceReader implements
SourceReader<SeaTunnelRow, PaimonSour
final SeaTunnelRow seaTunnelRow =
RowConverter.convert(
row, seaTunnelRowType,
((FileStoreTable) table).schema());
+ if
(Boundedness.UNBOUNDED.equals(context.getBoundedness())) {
+ RowKind rowKind =
+
RowKindConverter.convertPaimonRowKind2SeatunnelRowkind(
+ row.getRowKind());
+ if (rowKind != null) {
+ seaTunnelRow.setRowKind(rowKind);
+ }
+ }
output.collect(seaTunnelRow);
}
}
- } else if (noMoreSplit && sourceSplits.isEmpty()) {
+ }
+
+ if (noMoreSplit
+ && sourceSplits.isEmpty()
+ && Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the
data.
- log.info("Closed the bounded flink table store source");
+ log.info("Closed the bounded table store source");
context.signalNoMoreElement();
} else {
- log.warn("Waiting for flink table source split, sleeping 1s");
- Thread.sleep(1000L);
+ context.sendSplitRequest();
+ if (sourceSplits.isEmpty()) {
+ log.debug("Waiting for table source split, sleeping 1s");
+ Thread.sleep(1000L);
+ }
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
index d7b7b96f48..eba167eadd 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
@@ -21,22 +21,22 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.paimon.table.source.Split;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
/** Paimon source split, wrapped the {@link Split} of paimon table. */
+@Getter
+@AllArgsConstructor
public class PaimonSourceSplit implements SourceSplit {
private static final long serialVersionUID = 1L;
- private final Split split;
+ /** The unique ID of the split. Unique within the scope of this source. */
+ private final String id;
- public PaimonSourceSplit(Split split) {
- this.split = split;
- }
+ private final Split split;
@Override
public String splitId() {
return split.toString();
}
-
- public Split getSplit() {
- return split;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
deleted file mode 100644
index 7b0f14c3ab..0000000000
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.paimon.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.Split;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/** Paimon source split enumerator, used to calculate the splits for every
reader. */
-@Slf4j
-public class PaimonSourceSplitEnumerator
- implements SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState>
{
-
- /** Source split enumerator context */
- private final Context<PaimonSourceSplit> context;
-
- /** The splits that has assigned */
- private final Set<PaimonSourceSplit> assignedSplit;
-
- /** The splits that have not assigned */
- private Set<PaimonSourceSplit> pendingSplit;
-
- /** The table that wants to read */
- private final Table table;
-
- private final Predicate predicate;
-
- private int[] projection;
-
- public PaimonSourceSplitEnumerator(
- Context<PaimonSourceSplit> context,
- Table table,
- Predicate predicate,
- int[] projection) {
- this.context = context;
- this.table = table;
- this.assignedSplit = new HashSet<>();
- this.predicate = predicate;
- this.projection = projection;
- }
-
- public PaimonSourceSplitEnumerator(
- Context<PaimonSourceSplit> context,
- Table table,
- PaimonSourceState sourceState,
- Predicate predicate,
- int[] projection) {
- this.context = context;
- this.table = table;
- this.assignedSplit = sourceState.getAssignedSplits();
- this.predicate = predicate;
- this.projection = projection;
- }
-
- @Override
- public void open() {
- this.pendingSplit = new HashSet<>();
- }
-
- @Override
- public void run() throws Exception {
- // do nothing
- }
-
- @Override
- public void close() throws IOException {
- // do nothing
- }
-
- @Override
- public void addSplitsBack(List<PaimonSourceSplit> splits, int subtaskId) {
- if (!splits.isEmpty()) {
- pendingSplit.addAll(splits);
- assignSplit(subtaskId);
- }
- }
-
- @Override
- public int currentUnassignedSplitSize() {
- return pendingSplit.size();
- }
-
- @Override
- public void registerReader(int subtaskId) {
- pendingSplit = getTableSplits();
- assignSplit(subtaskId);
- }
-
- @Override
- public PaimonSourceState snapshotState(long checkpointId) throws Exception
{
- return new PaimonSourceState(assignedSplit);
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- // do nothing
- }
-
- @Override
- public void handleSplitRequest(int subtaskId) {
- // do nothing
- }
-
- /** Assign split by reader task id */
- private void assignSplit(int taskId) {
- ArrayList<PaimonSourceSplit> currentTaskSplits = new ArrayList<>();
- if (context.currentParallelism() == 1) {
- // if parallelism == 1, we should assign all the splits to reader
- currentTaskSplits.addAll(pendingSplit);
- } else {
- // if parallelism > 1, according to hashCode of split's id to
determine whether to
- // allocate the current task
- for (PaimonSourceSplit fileSourceSplit : pendingSplit) {
- final int splitOwner =
- getSplitOwner(fileSourceSplit.splitId(),
context.currentParallelism());
- if (splitOwner == taskId) {
- currentTaskSplits.add(fileSourceSplit);
- }
- }
- }
- // assign splits
- context.assignSplit(taskId, currentTaskSplits);
- // save the state of assigned splits
- assignedSplit.addAll(currentTaskSplits);
- // remove the assigned splits from pending splits
- currentTaskSplits.forEach(split -> pendingSplit.remove(split));
- log.info(
- "SubTask {} is assigned to [{}]",
- taskId,
- currentTaskSplits.stream()
- .map(PaimonSourceSplit::splitId)
- .collect(Collectors.joining(",")));
- context.signalNoMoreSplits(taskId);
- }
-
- /** Get all splits of table */
- private Set<PaimonSourceSplit> getTableSplits() {
- final Set<PaimonSourceSplit> tableSplits = new HashSet<>();
- final List<Split> splits =
- table.newReadBuilder()
- .withProjection(projection)
- .withFilter(predicate)
- .newScan()
- .plan()
- .splits();
- splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split)));
- return tableSplits;
- }
-
- /** Hash algorithm for assigning splits to readers */
- private static int getSplitOwner(String tp, int numReaders) {
- return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
new file mode 100644
index 0000000000..93b64c13a2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source;
+
+import org.apache.paimon.table.source.TableScan;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PaimonSourceSplitGenerator {
+ /**
+ * The current Id as a mutable string representation. This covers more
values than the integer
+ * value range, so we should never overflow.
+ */
+ private final char[] currentId = "0000000000".toCharArray();
+
+ public List<PaimonSourceSplit> createSplits(TableScan.Plan plan) {
+ return plan.splits().stream()
+ .map(s -> new PaimonSourceSplit(getNextId(), s))
+ .collect(Collectors.toList());
+ }
+
+ protected final String getNextId() {
+ // because we just increment numbers, we increment the char
representation directly,
+ // rather than incrementing an integer and converting it to a string
representation
+ // every time again (requires quite some expensive conversion logic).
+ incrementCharArrayByOne(currentId, currentId.length - 1);
+ return new String(currentId);
+ }
+
+ private static void incrementCharArrayByOne(char[] array, int pos) {
+ if (pos < 0) {
+ throw new RuntimeException("Produce too many splits.");
+ }
+
+ char c = array[pos];
+ c++;
+
+ if (c > '9') {
+ c = '0';
+ incrementCharArrayByOne(array, pos - 1);
+ }
+ array[pos] = c;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java
index c8336b0d03..db6392520c 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java
@@ -17,21 +17,22 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.source;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import javax.annotation.Nullable;
+
import java.io.Serializable;
-import java.util.Set;
+import java.util.Deque;
/** Paimon connector source state, saves the splits has assigned to readers. */
+@Getter
+@AllArgsConstructor
public class PaimonSourceState implements Serializable {
private static final long serialVersionUID = 1L;
- private final Set<PaimonSourceSplit> assignedSplits;
-
- public PaimonSourceState(Set<PaimonSourceSplit> assignedSplits) {
- this.assignedSplits = assignedSplits;
- }
+ private final Deque<PaimonSourceSplit> assignedSplits;
- public Set<PaimonSourceSplit> getAssignedSplits() {
- return assignedSplits;
- }
+ private final @Nullable Long currentSnapshotId;
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
new file mode 100644
index 0000000000..278381a24a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplitGenerator;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState;
+
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Slf4j
+public abstract class AbstractSplitEnumerator
+ implements SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState>
{
+
+ /** Source split enumerator context */
+ protected final Context<PaimonSourceSplit> context;
+
+ protected final Set<Integer> readersAwaitingSplit;
+
+ protected final PaimonSourceSplitGenerator splitGenerator;
+
+ /** The splits that have not assigned */
+ protected Deque<PaimonSourceSplit> pendingSplits;
+
+ protected final TableScan tableScan;
+
+ private final int splitMaxNum;
+
+ @Nullable protected Long nextSnapshotId;
+
+ protected boolean finished = false;
+
+ private ExecutorService executorService;
+
+ public AbstractSplitEnumerator(
+ Context<PaimonSourceSplit> context,
+ Deque<PaimonSourceSplit> pendingSplits,
+ @Nullable Long nextSnapshotId,
+ TableScan tableScan,
+ int splitMaxPerTask) {
+ this.context = context;
+ this.pendingSplits = new LinkedList<>(pendingSplits);
+ this.nextSnapshotId = nextSnapshotId;
+ this.readersAwaitingSplit = new LinkedHashSet<>();
+ this.splitGenerator = new PaimonSourceSplitGenerator();
+ this.tableScan = tableScan;
+ this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+ this.executorService =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+
.setNameFormat("Seatunnel-PaimonSourceSplitEnumerator-%d")
+ .build());
+ if (tableScan instanceof StreamTableScan && nextSnapshotId != null) {
+ ((StreamTableScan) tableScan).restore(nextSnapshotId);
+ }
+ }
+
+ @Override
+ public void open() {}
+
+ @Override
+ public void run() throws Exception {
+ loadNewSplits();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (Objects.nonNull(executorService) && !executorService.isShutdown())
{
+ executorService.shutdown();
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<PaimonSourceSplit> splits, int subtaskId) {
+ log.debug("Paimon Source Enumerator adds splits back: {}", splits);
+ this.pendingSplits.addAll(splits);
+ if (context.registeredReaders().contains(subtaskId)) {
+ assignSplits();
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplits.size();
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ readersAwaitingSplit.add(subtaskId);
+ }
+
+ @Override
+ public PaimonSourceState snapshotState(long checkpointId) throws Exception
{
+ return new PaimonSourceState(pendingSplits, nextSnapshotId);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+
+ private void addSplits(Collection<PaimonSourceSplit> newSplits) {
+ this.pendingSplits.addAll(newSplits);
+ }
+
+ /**
+ * Method should be synchronized because {@link #handleSplitRequest} and
{@link
+ * #processDiscoveredSplits} have thread conflicts.
+ */
+ protected synchronized void assignSplits() {
+ Iterator<Integer> pendingReaderIterator =
readersAwaitingSplit.iterator();
+ while (pendingReaderIterator.hasNext()) {
+ Integer pendingReader = pendingReaderIterator.next();
+ if (!context.registeredReaders().contains(pendingReader)) {
+ pendingReaderIterator.remove();
+ continue;
+ }
+ LinkedList<PaimonSourceSplit> assignedTaskSplits = new
LinkedList<>();
+ for (PaimonSourceSplit fileSourceSplit : pendingSplits) {
+ final int splitOwner =
+ getSplitOwner(fileSourceSplit.splitId(),
context.currentParallelism());
+ if (splitOwner == pendingReader) {
+ assignedTaskSplits.add(fileSourceSplit);
+ }
+ }
+
+ if (!assignedTaskSplits.isEmpty()) {
+ log.info("Assign splits {} to reader {}", assignedTaskSplits,
pendingReader);
+ try {
+ context.assignSplit(pendingReader, assignedTaskSplits);
+ // remove the assigned splits from pending splits
+ assignedTaskSplits.forEach(pendingSplits::remove);
+ } catch (Exception e) {
+ log.error(
+ "Failed to assign splits {} to reader {}",
+ assignedTaskSplits,
+ pendingReader,
+ e);
+ pendingSplits.addAll(assignedTaskSplits);
+ }
+ }
+ }
+ }
+
+ protected void loadNewSplits() {
+ CompletableFuture.supplyAsync(this::scanNextSnapshot, executorService)
+ .whenComplete(this::processDiscoveredSplits);
+ }
+
+ /** Hash algorithm for assigning splits to readers */
+ protected static int getSplitOwner(String tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ // ------------------------------------------------------------------------
+
+ // This need to be synchronized because scan object is not thread safe.
handleSplitRequest and
+ // CompletableFuture.supplyAsync will invoke this.
+ protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot()
{
+ if (pendingSplits.size() >= splitMaxNum) {
+ return Optional.empty();
+ }
+ TableScan.Plan plan = tableScan.plan();
+ Long nextSnapshotId = null;
+ if (tableScan instanceof StreamTableScan) {
+ nextSnapshotId = ((StreamTableScan) tableScan).checkpoint();
+ }
+ return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
+ }
+
+ // This method could not be synchronized, because it runs in
coordinatorThread, which will make
+ // it serializable execution.
+ protected void processDiscoveredSplits(
+ Optional<PlanWithNextSnapshotId> planWithNextSnapshotIdOptional,
Throwable error) {
+ if (error != null) {
+ if (error instanceof EndOfScanException) {
+ log.debug("Catching EndOfStreamException, the stream is
finished.");
+ finished = true;
+ assignSplits();
+ } else {
+ log.error("Failed to enumerate files", error);
+ throw new SeaTunnelException(error);
+ }
+ return;
+ }
+ if (!planWithNextSnapshotIdOptional.isPresent()) {
+ return;
+ }
+ PlanWithNextSnapshotId planWithNextSnapshotId =
planWithNextSnapshotIdOptional.get();
+ nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
+ TableScan.Plan plan = planWithNextSnapshotId.plan;
+
+ if (plan.splits().isEmpty()) {
+ return;
+ }
+
+ addSplits(splitGenerator.createSplits(plan));
+ assignSplits();
+ }
+
+ /** The result of scan. */
+ @Getter
+ protected static class PlanWithNextSnapshotId {
+ private final TableScan.Plan plan;
+ private final Long nextSnapshotId;
+
+ public PlanWithNextSnapshotId(TableScan.Plan plan, Long
nextSnapshotId) {
+ this.plan = plan;
+ this.nextSnapshotId = nextSnapshotId;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
new file mode 100644
index 0000000000..b00b38587a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+
+import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState;
+
+import org.apache.paimon.table.source.TableScan;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+
+import java.util.Deque;
+import java.util.Set;
+
+/** Paimon source split enumerator, used to calculate the splits for every
reader. */
+@Slf4j
+public class PaimonBatchSourceSplitEnumerator extends AbstractSplitEnumerator {
+
+ public PaimonBatchSourceSplitEnumerator(
+ Context<PaimonSourceSplit> context,
+ Deque<PaimonSourceSplit> pendingSplits,
+ @Nullable Long nextSnapshotId,
+ TableScan tableScan,
+ int splitMaxPerTask) {
+ super(context, pendingSplits, nextSnapshotId, tableScan,
splitMaxPerTask);
+ }
+
+ @Override
+ public void run() throws Exception {
+ this.processDiscoveredSplits(this.scanNextSnapshot(), null);
+ Set<Integer> readers = context.registeredReaders();
+ log.debug(
+ "No more splits to assign." + " Sending NoMoreSplitsEvent to
reader {}.", readers);
+ readers.forEach(context::signalNoMoreSplits);
+ }
+
+ @Override
+ public PaimonSourceState snapshotState(long checkpointId) throws Exception
{
+ return new PaimonSourceState(pendingSplits, null);
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ // do nothing
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
new file mode 100644
index 0000000000..2cce57be93
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+
+import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
+
+import org.apache.paimon.table.source.TableScan;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+
+import java.util.Deque;
+
+/** Paimon source split enumerator, used to calculate the splits for every
reader. */
+@Slf4j
+public class PaimonStreamSourceSplitEnumerator extends AbstractSplitEnumerator
{
+
+ public PaimonStreamSourceSplitEnumerator(
+ Context<PaimonSourceSplit> context,
+ Deque<PaimonSourceSplit> pendingSplits,
+ @Nullable Long nextSnapshotId,
+ TableScan tableScan,
+ int splitMaxPerTask) {
+ super(context, pendingSplits, nextSnapshotId, tableScan,
splitMaxPerTask);
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ readersAwaitingSplit.add(subtaskId);
+ assignSplits();
+ if (readersAwaitingSplit.contains(subtaskId)) {
+ loadNewSplits();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
index adb77c637d..4a3833e6a0 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
@@ -19,12 +19,11 @@ package
org.apache.seatunnel.connectors.seatunnel.paimon.utils;
import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.paimon.data.InternalRow;
-
public class RowKindConverter {
/**
- * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link
InternalRow}
+ * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link
+ * org.apache.paimon.types.RowKind}
*
* @param seaTunnelRowKind The kind of change that a row describes in a
changelog.
* @return
@@ -44,4 +43,27 @@ public class RowKindConverter {
return null;
}
}
+
+ /**
+ * Convert Paimon RowKind {@link org.apache.paimon.types.RowKind} to
SeaTunnel RowKind {@link
+ * RowKind}
+ *
+ * @param paimonRowKind
+ * @return
+ */
+ public static RowKind convertPaimonRowKind2SeatunnelRowkind(
+ org.apache.paimon.types.RowKind paimonRowKind) {
+ switch (paimonRowKind) {
+ case DELETE:
+ return RowKind.DELETE;
+ case UPDATE_AFTER:
+ return RowKind.UPDATE_AFTER;
+ case UPDATE_BEFORE:
+ return RowKind.UPDATE_BEFORE;
+ case INSERT:
+ return RowKind.INSERT;
+ default:
+ return null;
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
new file mode 100644
index 0000000000..7da7afaa0b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class PaimonRecordWithFullType {
+ public Map c_map;
+ public int[] c_array;
+ public BinaryString c_string;
+ public boolean c_boolean;
+ public short c_tinyint;
+ public short c_smallint;
+ public int c_int;
+ public long c_bigint;
+ public float c_float;
+ public double c_double;
+ public Decimal c_decimal;
+ public BinaryString c_bytes;
+ public int c_date;
+ public Timestamp c_timestamp;
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 0168cc8f53..dc6bfc9eba 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -637,7 +637,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
return result;
}
- private Table getTable(String dbName, String tbName) {
+ protected Table getTable(String dbName, String tbName) {
try {
return getCatalog().getTable(getIdentifier(dbName, tbName));
} catch (Catalog.TableNotExistException e) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
new file mode 100644
index 0000000000..ede9f7c3b3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "Spark and Flink engine can not auto create paimon table on
worker node in local file(e.g flink tm) by savemode feature which can lead
error")
+@Slf4j
+public class PaimonStreamReadIT extends PaimonSinkCDCIT {
+
+ @TestTemplate
+ public void testStreamReadPaimon(TestContainer container) throws Exception
{
+ Container.ExecResult writeResult =
+ container.executeJob("/fake_to_paimon_with_full_type.conf");
+ Assertions.assertEquals(0, writeResult.getExitCode());
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob("/paimon_to_paimon.conf");
+ } catch (Exception e) {
+ throw new SeaTunnelException(e);
+ }
+ });
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(400L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+
container.executeExtraCommands(containerExtendedFactory);
+ List<PaimonRecordWithFullType> paimonSourceRecords
=
+ loadPaimonDataWithFullType("full_type",
"st_test");
+ List<PaimonRecordWithFullType> paimonSinkRecords =
+ loadPaimonDataWithFullType("full_type",
"st_test_sink");
+ Assertions.assertEquals(
+ paimonSourceRecords.size(),
paimonSinkRecords.size());
+
Assertions.assertIterableEquals(paimonSourceRecords, paimonSinkRecords);
+ });
+
+ // write cdc data
+ Container.ExecResult writeResult1 =
+
container.executeJob("/fake_to_paimon_with_full_type_cdc_data.conf");
+ Assertions.assertEquals(0, writeResult1.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(400L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+
container.executeExtraCommands(containerExtendedFactory);
+ List<PaimonRecordWithFullType> paimonSourceRecords
=
+ loadPaimonDataWithFullType("full_type",
"st_test");
+ List<PaimonRecordWithFullType> paimonSinkRecords =
+ loadPaimonDataWithFullType("full_type",
"st_test_sink");
+ Assertions.assertEquals(
+ paimonSourceRecords.size(),
paimonSinkRecords.size());
+
Assertions.assertIterableEquals(paimonSourceRecords, paimonSinkRecords);
+ });
+ }
+
+ protected List<PaimonRecordWithFullType> loadPaimonDataWithFullType(
+ String dbName, String tbName) {
+ FileStoreTable table = (FileStoreTable) getTable(dbName, tbName);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+ List<PaimonRecordWithFullType> result = new ArrayList<>();
+ try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
+ reader.forEachRemaining(
+ row -> {
+ InternalMap internalMap = row.getMap(0);
+ InternalArray keyArray = internalMap.keyArray();
+ InternalArray valueArray = internalMap.valueArray();
+ HashMap<Object, Object> map = new
HashMap<>(internalMap.size());
+ for (int i = 0; i < internalMap.size(); i++) {
+ map.put(keyArray.getString(i),
valueArray.getString(i));
+ }
+ InternalArray internalArray = row.getArray(1);
+ int[] intArray = internalArray.toIntArray();
+ PaimonRecordWithFullType paimonRecordWithFullType =
+ new PaimonRecordWithFullType(
+ map,
+ intArray,
+ row.getString(2),
+ row.getBoolean(3),
+ row.getShort(4),
+ row.getShort(5),
+ row.getInt(6),
+ row.getLong(7),
+ row.getFloat(8),
+ row.getDouble(9),
+ row.getDecimal(10, 30, 8),
+ row.getString(11),
+ row.getInt(12),
+ row.getTimestamp(13, 6));
+ result.add(paimonRecordWithFullType);
+ });
+ } catch (IOException e) {
+ throw new SeaTunnelException(e);
+ }
+ return result;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
new file mode 100644
index 0000000000..c5b881c2ce
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ primaryKey {
+ name = "c_tinyint"
+ columnNames = [c_tinyint]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [{"a": "b"}, [101], "c_string", true, 121, 15987, 563873951,
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211",
"bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"]
+ }
+ {
+ kind = INSERT
+ fields = [{"a": "b"}, [101], "c_string1", true, 122, 15987, 563873952,
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211",
"bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [{"a": "c"}, [102], "c_string2", true, 117, 15987, 563873953,
7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212",
"bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = [{"a": "e"}, [103], "c_string3", false, 117, 15989,
563873951, 7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213",
"bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"]
+ }
+ {
+ kind = DELETE
+ fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953,
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213",
"bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"]
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
new file mode 100644
index 0000000000..50728871af
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "Streaming"
+}
+
+source {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test"
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test_sink"
+ paimon.table.primary-keys = "c_tinyint"
+ }
+}