This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4a2e27291c [Feature][Connector-v2] Support streaming read for paimon 
(#7681)
4a2e27291c is described below

commit 4a2e27291c4d16d36ff4edaf70a67242ff78e9bf
Author: dailai <[email protected]>
AuthorDate: Fri Sep 20 13:16:09 2024 +0800

    [Feature][Connector-v2] Support streaming read for paimon (#7681)
---
 docs/en/connector-v2/source/Paimon.md              |  29 ++-
 .../seatunnel/paimon/source/PaimonSource.java      |  58 ++++-
 .../paimon/source/PaimonSourceReader.java          |  47 ++--
 .../seatunnel/paimon/source/PaimonSourceSplit.java |  16 +-
 .../paimon/source/PaimonSourceSplitEnumerator.java | 180 ---------------
 .../paimon/source/PaimonSourceSplitGenerator.java  |  60 +++++
 .../seatunnel/paimon/source/PaimonSourceState.java |  19 +-
 .../source/enumerator/AbstractSplitEnumerator.java | 249 +++++++++++++++++++++
 .../PaimonBatchSourceSplitEnumerator.java          |  63 ++++++
 .../PaimonStreamSourceSplitEnumerator.java         |  51 +++++
 .../seatunnel/paimon/utils/RowKindConverter.java   |  28 ++-
 .../connector/paimon/PaimonRecordWithFullType.java |  49 ++++
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      |   2 +-
 .../e2e/connector/paimon/PaimonStreamReadIT.java   | 152 +++++++++++++
 .../fake_to_paimon_with_full_type_cdc_data.conf    |  79 +++++++
 .../src/test/resources/paimon_to_paimon.conf       |  38 ++++
 16 files changed, 883 insertions(+), 237 deletions(-)

diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index 32155abde0..e586a4fd9d 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -9,7 +9,7 @@ Read data from Apache Paimon.
 ## Key features
 
 - [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [ ] [parallelism](../../concept/connector-v2-features.md)
@@ -157,9 +157,30 @@ source {
 ```
 
 ## Changelog
+If you want to read the changelog of this connector, your sink table of paimon 
which mast has the options named `changelog-producer=input`, then you can refer 
to [Paimon 
changelog](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/).
+Currently, we only support the `input` and `none` mode of changelog producer. 
If the changelog producer is `input`, the streaming read of the connector will 
generate -U,+U,+I,+D data. But if the changelog producer is `none`, the 
streaming read of the connector will generate +I,+U,+D data.
 
-### next version
+### Streaming read example
+```hocon
+env {
+  parallelism = 1
+  job.mode = "Streaming"
+}
 
-- Add Paimon Source Connector
-- Support projection for Paimon Source
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+  }
+}
 
+sink {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test_sink"
+    paimon.table.primary-keys = "c_tinyint"
+  }
+}
+```
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index d0a0c4a793..d5c31ff235 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -26,18 +27,23 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonBatchSourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonStreamSourceSplitEnumerator;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
 
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.RowType;
 
 import net.sf.jsqlparser.statement.select.PlainSelect;
 
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 
@@ -58,12 +64,12 @@ public class PaimonSource
 
     private Table paimonTable;
 
-    private Predicate predicate;
-
-    private int[] projectionIndex;
+    private JobContext jobContext;
 
     private CatalogTable catalogTable;
 
+    protected final ReadBuilder readBuilder;
+
     public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog 
paimonCatalog) {
         this.readonlyConfig = readonlyConfig;
         PaimonSourceConfig paimonSourceConfig = new 
PaimonSourceConfig(readonlyConfig);
@@ -76,17 +82,22 @@ public class PaimonSource
         PlainSelect plainSelect = convertToPlainSelect(filterSql);
         RowType paimonRowType = this.paimonTable.rowType();
         String[] filedNames = paimonRowType.getFieldNames().toArray(new 
String[0]);
+
+        Predicate predicate = null;
+        int[] projectionIndex = null;
         if (!Objects.isNull(plainSelect)) {
-            this.projectionIndex = 
convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
+            projectionIndex = 
convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
             if (!Objects.isNull(projectionIndex)) {
                 this.catalogTable =
                         paimonCatalog.getTableWithProjection(tablePath, 
projectionIndex);
             }
-            this.predicate =
+            predicate =
                     
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
                             paimonRowType, plainSelect);
         }
-        seaTunnelRowType = RowTypeConverter.convert(paimonRowType, 
projectionIndex);
+        this.seaTunnelRowType = RowTypeConverter.convert(paimonRowType, 
projectionIndex);
+        this.readBuilder =
+                
paimonTable.newReadBuilder().withProjection(projectionIndex).withFilter(predicate);
     }
 
     @Override
@@ -99,23 +110,34 @@ public class PaimonSource
         return Collections.singletonList(catalogTable);
     }
 
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+    }
+
     @Override
     public Boundedness getBoundedness() {
-        return Boundedness.BOUNDED;
+        return JobMode.BATCH.equals(jobContext.getJobMode())
+                ? Boundedness.BOUNDED
+                : Boundedness.UNBOUNDED;
     }
 
     @Override
     public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
         return new PaimonSourceReader(
-                readerContext, paimonTable, seaTunnelRowType, predicate, 
projectionIndex);
+                readerContext, paimonTable, seaTunnelRowType, 
readBuilder.newRead());
     }
 
     @Override
     public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> 
createEnumerator(
             SourceSplitEnumerator.Context<PaimonSourceSplit> 
enumeratorContext) throws Exception {
-        return new PaimonSourceSplitEnumerator(
-                enumeratorContext, paimonTable, predicate, projectionIndex);
+        if (getBoundedness() == Boundedness.BOUNDED) {
+            return new PaimonBatchSourceSplitEnumerator(
+                    enumeratorContext, new LinkedList<>(), null, 
readBuilder.newScan(), 1);
+        }
+        return new PaimonStreamSourceSplitEnumerator(
+                enumeratorContext, new LinkedList<>(), null, 
readBuilder.newStreamScan(), 1);
     }
 
     @Override
@@ -123,7 +145,19 @@ public class PaimonSource
             SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext,
             PaimonSourceState checkpointState)
             throws Exception {
-        return new PaimonSourceSplitEnumerator(
-                enumeratorContext, paimonTable, checkpointState, predicate, 
projectionIndex);
+        if (getBoundedness() == Boundedness.BOUNDED) {
+            return new PaimonBatchSourceSplitEnumerator(
+                    enumeratorContext,
+                    checkpointState.getAssignedSplits(),
+                    checkpointState.getCurrentSnapshotId(),
+                    readBuilder.newScan(),
+                    1);
+        }
+        return new PaimonStreamSourceSplitEnumerator(
+                enumeratorContext,
+                checkpointState.getAssignedSplits(),
+                checkpointState.getCurrentSnapshotId(),
+                readBuilder.newStreamScan(),
+                1);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index 3cfa5ee8b9..50de479b4b 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -17,18 +17,21 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source;
 
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowKindConverter;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.TableRead;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -48,20 +51,14 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
     private final Table table;
     private final SeaTunnelRowType seaTunnelRowType;
     private volatile boolean noMoreSplit;
-    private final Predicate predicate;
-    private int[] projection;
+    private final TableRead tableRead;
 
     public PaimonSourceReader(
-            Context context,
-            Table table,
-            SeaTunnelRowType seaTunnelRowType,
-            Predicate predicate,
-            int[] projection) {
+            Context context, Table table, SeaTunnelRowType seaTunnelRowType, 
TableRead tableRead) {
         this.context = context;
         this.table = table;
         this.seaTunnelRowType = seaTunnelRowType;
-        this.predicate = predicate;
-        this.projection = projection;
+        this.tableRead = tableRead;
     }
 
     @Override
@@ -81,12 +78,7 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
             if (Objects.nonNull(split)) {
                 // read logic
                 try (final RecordReader<InternalRow> reader =
-                        table.newReadBuilder()
-                                .withProjection(projection)
-                                .withFilter(predicate)
-                                .newRead()
-                                .executeFilter()
-                                .createReader(split.getSplit())) {
+                        
tableRead.executeFilter().createReader(split.getSplit())) {
                     final RecordReaderIterator<InternalRow> rowIterator =
                             new RecordReaderIterator<>(reader);
                     while (rowIterator.hasNext()) {
@@ -94,16 +86,31 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
                         final SeaTunnelRow seaTunnelRow =
                                 RowConverter.convert(
                                         row, seaTunnelRowType, 
((FileStoreTable) table).schema());
+                        if 
(Boundedness.UNBOUNDED.equals(context.getBoundedness())) {
+                            RowKind rowKind =
+                                    
RowKindConverter.convertPaimonRowKind2SeatunnelRowkind(
+                                            row.getRowKind());
+                            if (rowKind != null) {
+                                seaTunnelRow.setRowKind(rowKind);
+                            }
+                        }
                         output.collect(seaTunnelRow);
                     }
                 }
-            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+            }
+
+            if (noMoreSplit
+                    && sourceSplits.isEmpty()
+                    && Boundedness.BOUNDED.equals(context.getBoundedness())) {
                 // signal to the source that we have reached the end of the 
data.
-                log.info("Closed the bounded flink table store source");
+                log.info("Closed the bounded table store source");
                 context.signalNoMoreElement();
             } else {
-                log.warn("Waiting for flink table source split, sleeping 1s");
-                Thread.sleep(1000L);
+                context.sendSplitRequest();
+                if (sourceSplits.isEmpty()) {
+                    log.debug("Waiting for table source split, sleeping 1s");
+                    Thread.sleep(1000L);
+                }
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
index d7b7b96f48..eba167eadd 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
@@ -21,22 +21,22 @@ import org.apache.seatunnel.api.source.SourceSplit;
 
 import org.apache.paimon.table.source.Split;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 /** Paimon source split, wrapped the {@link Split} of paimon table. */
+@Getter
+@AllArgsConstructor
 public class PaimonSourceSplit implements SourceSplit {
     private static final long serialVersionUID = 1L;
 
-    private final Split split;
+    /** The unique ID of the split. Unique within the scope of this source. */
+    private final String id;
 
-    public PaimonSourceSplit(Split split) {
-        this.split = split;
-    }
+    private final Split split;
 
     @Override
     public String splitId() {
         return split.toString();
     }
-
-    public Split getSplit() {
-        return split;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
deleted file mode 100644
index 7b0f14c3ab..0000000000
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.paimon.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.Split;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/** Paimon source split enumerator, used to calculate the splits for every 
reader. */
-@Slf4j
-public class PaimonSourceSplitEnumerator
-        implements SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> 
{
-
-    /** Source split enumerator context */
-    private final Context<PaimonSourceSplit> context;
-
-    /** The splits that has assigned */
-    private final Set<PaimonSourceSplit> assignedSplit;
-
-    /** The splits that have not assigned */
-    private Set<PaimonSourceSplit> pendingSplit;
-
-    /** The table that wants to read */
-    private final Table table;
-
-    private final Predicate predicate;
-
-    private int[] projection;
-
-    public PaimonSourceSplitEnumerator(
-            Context<PaimonSourceSplit> context,
-            Table table,
-            Predicate predicate,
-            int[] projection) {
-        this.context = context;
-        this.table = table;
-        this.assignedSplit = new HashSet<>();
-        this.predicate = predicate;
-        this.projection = projection;
-    }
-
-    public PaimonSourceSplitEnumerator(
-            Context<PaimonSourceSplit> context,
-            Table table,
-            PaimonSourceState sourceState,
-            Predicate predicate,
-            int[] projection) {
-        this.context = context;
-        this.table = table;
-        this.assignedSplit = sourceState.getAssignedSplits();
-        this.predicate = predicate;
-        this.projection = projection;
-    }
-
-    @Override
-    public void open() {
-        this.pendingSplit = new HashSet<>();
-    }
-
-    @Override
-    public void run() throws Exception {
-        // do nothing
-    }
-
-    @Override
-    public void close() throws IOException {
-        // do nothing
-    }
-
-    @Override
-    public void addSplitsBack(List<PaimonSourceSplit> splits, int subtaskId) {
-        if (!splits.isEmpty()) {
-            pendingSplit.addAll(splits);
-            assignSplit(subtaskId);
-        }
-    }
-
-    @Override
-    public int currentUnassignedSplitSize() {
-        return pendingSplit.size();
-    }
-
-    @Override
-    public void registerReader(int subtaskId) {
-        pendingSplit = getTableSplits();
-        assignSplit(subtaskId);
-    }
-
-    @Override
-    public PaimonSourceState snapshotState(long checkpointId) throws Exception 
{
-        return new PaimonSourceState(assignedSplit);
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        // do nothing
-    }
-
-    @Override
-    public void handleSplitRequest(int subtaskId) {
-        // do nothing
-    }
-
-    /** Assign split by reader task id */
-    private void assignSplit(int taskId) {
-        ArrayList<PaimonSourceSplit> currentTaskSplits = new ArrayList<>();
-        if (context.currentParallelism() == 1) {
-            // if parallelism == 1, we should assign all the splits to reader
-            currentTaskSplits.addAll(pendingSplit);
-        } else {
-            // if parallelism > 1, according to hashCode of split's id to 
determine whether to
-            // allocate the current task
-            for (PaimonSourceSplit fileSourceSplit : pendingSplit) {
-                final int splitOwner =
-                        getSplitOwner(fileSourceSplit.splitId(), 
context.currentParallelism());
-                if (splitOwner == taskId) {
-                    currentTaskSplits.add(fileSourceSplit);
-                }
-            }
-        }
-        // assign splits
-        context.assignSplit(taskId, currentTaskSplits);
-        // save the state of assigned splits
-        assignedSplit.addAll(currentTaskSplits);
-        // remove the assigned splits from pending splits
-        currentTaskSplits.forEach(split -> pendingSplit.remove(split));
-        log.info(
-                "SubTask {} is assigned to [{}]",
-                taskId,
-                currentTaskSplits.stream()
-                        .map(PaimonSourceSplit::splitId)
-                        .collect(Collectors.joining(",")));
-        context.signalNoMoreSplits(taskId);
-    }
-
-    /** Get all splits of table */
-    private Set<PaimonSourceSplit> getTableSplits() {
-        final Set<PaimonSourceSplit> tableSplits = new HashSet<>();
-        final List<Split> splits =
-                table.newReadBuilder()
-                        .withProjection(projection)
-                        .withFilter(predicate)
-                        .newScan()
-                        .plan()
-                        .splits();
-        splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split)));
-        return tableSplits;
-    }
-
-    /** Hash algorithm for assigning splits to readers */
-    private static int getSplitOwner(String tp, int numReaders) {
-        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
new file mode 100644
index 0000000000..93b64c13a2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source;
+
+import org.apache.paimon.table.source.TableScan;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PaimonSourceSplitGenerator {
+    /**
+     * The current Id as a mutable string representation. This covers more 
values than the integer
+     * value range, so we should never overflow.
+     */
+    private final char[] currentId = "0000000000".toCharArray();
+
+    public List<PaimonSourceSplit> createSplits(TableScan.Plan plan) {
+        return plan.splits().stream()
+                .map(s -> new PaimonSourceSplit(getNextId(), s))
+                .collect(Collectors.toList());
+    }
+
+    protected final String getNextId() {
+        // because we just increment numbers, we increment the char 
representation directly,
+        // rather than incrementing an integer and converting it to a string 
representation
+        // every time again (requires quite some expensive conversion logic).
+        incrementCharArrayByOne(currentId, currentId.length - 1);
+        return new String(currentId);
+    }
+
+    private static void incrementCharArrayByOne(char[] array, int pos) {
+        if (pos < 0) {
+            throw new RuntimeException("Produce too many splits.");
+        }
+
+        char c = array[pos];
+        c++;
+
+        if (c > '9') {
+            c = '0';
+            incrementCharArrayByOne(array, pos - 1);
+        }
+        array[pos] = c;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java
index c8336b0d03..db6392520c 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java
@@ -17,21 +17,22 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
-import java.util.Set;
+import java.util.Deque;
 
 /** Paimon connector source state, saves the splits has assigned to readers. */
+@Getter
+@AllArgsConstructor
 public class PaimonSourceState implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final Set<PaimonSourceSplit> assignedSplits;
-
-    public PaimonSourceState(Set<PaimonSourceSplit> assignedSplits) {
-        this.assignedSplits = assignedSplits;
-    }
+    private final Deque<PaimonSourceSplit> assignedSplits;
 
-    public Set<PaimonSourceSplit> getAssignedSplits() {
-        return assignedSplits;
-    }
+    private final @Nullable Long currentSnapshotId;
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
new file mode 100644
index 0000000000..278381a24a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplitGenerator;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState;
+
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Slf4j
+public abstract class AbstractSplitEnumerator
+        implements SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> 
{
+
+    /** Source split enumerator context */
+    protected final Context<PaimonSourceSplit> context;
+
+    protected final Set<Integer> readersAwaitingSplit;
+
+    protected final PaimonSourceSplitGenerator splitGenerator;
+
+    /** The splits that have not assigned */
+    protected Deque<PaimonSourceSplit> pendingSplits;
+
+    protected final TableScan tableScan;
+
+    private final int splitMaxNum;
+
+    @Nullable protected Long nextSnapshotId;
+
+    protected boolean finished = false;
+
+    private ExecutorService executorService;
+
+    public AbstractSplitEnumerator(
+            Context<PaimonSourceSplit> context,
+            Deque<PaimonSourceSplit> pendingSplits,
+            @Nullable Long nextSnapshotId,
+            TableScan tableScan,
+            int splitMaxPerTask) {
+        this.context = context;
+        this.pendingSplits = new LinkedList<>(pendingSplits);
+        this.nextSnapshotId = nextSnapshotId;
+        this.readersAwaitingSplit = new LinkedHashSet<>();
+        this.splitGenerator = new PaimonSourceSplitGenerator();
+        this.tableScan = tableScan;
+        this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+        this.executorService =
+                Executors.newCachedThreadPool(
+                        new ThreadFactoryBuilder()
+                                
.setNameFormat("Seatunnel-PaimonSourceSplitEnumerator-%d")
+                                .build());
+        if (tableScan instanceof StreamTableScan && nextSnapshotId != null) {
+            ((StreamTableScan) tableScan).restore(nextSnapshotId);
+        }
+    }
+
+    @Override
+    public void open() {}
+
+    @Override
+    public void run() throws Exception {
+        loadNewSplits();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (Objects.nonNull(executorService) && !executorService.isShutdown()) 
{
+            executorService.shutdown();
+        }
+    }
+
+    @Override
+    public void addSplitsBack(List<PaimonSourceSplit> splits, int subtaskId) {
+        log.debug("Paimon Source Enumerator adds splits back: {}", splits);
+        this.pendingSplits.addAll(splits);
+        if (context.registeredReaders().contains(subtaskId)) {
+            assignSplits();
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplits.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        readersAwaitingSplit.add(subtaskId);
+    }
+
+    @Override
+    public PaimonSourceState snapshotState(long checkpointId) throws Exception 
{
+        return new PaimonSourceState(pendingSplits, nextSnapshotId);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+
+    private void addSplits(Collection<PaimonSourceSplit> newSplits) {
+        this.pendingSplits.addAll(newSplits);
+    }
+
+    /**
+     * Method should be synchronized because {@link #handleSplitRequest} and 
{@link
+     * #processDiscoveredSplits} have thread conflicts.
+     */
+    protected synchronized void assignSplits() {
+        Iterator<Integer> pendingReaderIterator = 
readersAwaitingSplit.iterator();
+        while (pendingReaderIterator.hasNext()) {
+            Integer pendingReader = pendingReaderIterator.next();
+            if (!context.registeredReaders().contains(pendingReader)) {
+                pendingReaderIterator.remove();
+                continue;
+            }
+            LinkedList<PaimonSourceSplit> assignedTaskSplits = new 
LinkedList<>();
+            for (PaimonSourceSplit fileSourceSplit : pendingSplits) {
+                final int splitOwner =
+                        getSplitOwner(fileSourceSplit.splitId(), 
context.currentParallelism());
+                if (splitOwner == pendingReader) {
+                    assignedTaskSplits.add(fileSourceSplit);
+                }
+            }
+
+            if (!assignedTaskSplits.isEmpty()) {
+                log.info("Assign splits {} to reader {}", assignedTaskSplits, 
pendingReader);
+                try {
+                    context.assignSplit(pendingReader, assignedTaskSplits);
+                    // remove the assigned splits from pending splits
+                    assignedTaskSplits.forEach(pendingSplits::remove);
+                } catch (Exception e) {
+                    log.error(
+                            "Failed to assign splits {} to reader {}",
+                            assignedTaskSplits,
+                            pendingReader,
+                            e);
+                    pendingSplits.addAll(assignedTaskSplits);
+                }
+            }
+        }
+    }
+
+    protected void loadNewSplits() {
+        CompletableFuture.supplyAsync(this::scanNextSnapshot, executorService)
+                .whenComplete(this::processDiscoveredSplits);
+    }
+
+    /** Hash algorithm for assigning splits to readers */
+    protected static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    // ------------------------------------------------------------------------
+
+    // This need to be synchronized because scan object is not thread safe. 
handleSplitRequest and
+    // CompletableFuture.supplyAsync will invoke this.
+    protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot() 
{
+        if (pendingSplits.size() >= splitMaxNum) {
+            return Optional.empty();
+        }
+        TableScan.Plan plan = tableScan.plan();
+        Long nextSnapshotId = null;
+        if (tableScan instanceof StreamTableScan) {
+            nextSnapshotId = ((StreamTableScan) tableScan).checkpoint();
+        }
+        return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
+    }
+
+    // This method could not be synchronized, because it runs in 
coordinatorThread, which will make
+    // it serializable execution.
+    protected void processDiscoveredSplits(
+            Optional<PlanWithNextSnapshotId> planWithNextSnapshotIdOptional, 
Throwable error) {
+        if (error != null) {
+            if (error instanceof EndOfScanException) {
+                log.debug("Catching EndOfStreamException, the stream is 
finished.");
+                finished = true;
+                assignSplits();
+            } else {
+                log.error("Failed to enumerate files", error);
+                throw new SeaTunnelException(error);
+            }
+            return;
+        }
+        if (!planWithNextSnapshotIdOptional.isPresent()) {
+            return;
+        }
+        PlanWithNextSnapshotId planWithNextSnapshotId = 
planWithNextSnapshotIdOptional.get();
+        nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
+        TableScan.Plan plan = planWithNextSnapshotId.plan;
+
+        if (plan.splits().isEmpty()) {
+            return;
+        }
+
+        addSplits(splitGenerator.createSplits(plan));
+        assignSplits();
+    }
+
+    /** The result of scan. */
+    @Getter
+    protected static class PlanWithNextSnapshotId {
+        private final TableScan.Plan plan;
+        private final Long nextSnapshotId;
+
+        public PlanWithNextSnapshotId(TableScan.Plan plan, Long 
nextSnapshotId) {
+            this.plan = plan;
+            this.nextSnapshotId = nextSnapshotId;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
new file mode 100644
index 0000000000..b00b38587a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState;
+
+import org.apache.paimon.table.source.TableScan;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+
+import java.util.Deque;
+import java.util.Set;
+
+/** Paimon source split enumerator, used to calculate the splits for every 
reader. */
+@Slf4j
+public class PaimonBatchSourceSplitEnumerator extends AbstractSplitEnumerator {
+
+    public PaimonBatchSourceSplitEnumerator(
+            Context<PaimonSourceSplit> context,
+            Deque<PaimonSourceSplit> pendingSplits,
+            @Nullable Long nextSnapshotId,
+            TableScan tableScan,
+            int splitMaxPerTask) {
+        super(context, pendingSplits, nextSnapshotId, tableScan, 
splitMaxPerTask);
+    }
+
+    @Override
+    public void run() throws Exception {
+        this.processDiscoveredSplits(this.scanNextSnapshot(), null);
+        Set<Integer> readers = context.registeredReaders();
+        log.debug(
+                "No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
+        readers.forEach(context::signalNoMoreSplits);
+    }
+
+    @Override
+    public PaimonSourceState snapshotState(long checkpointId) throws Exception 
{
+        return new PaimonSourceState(pendingSplits, null);
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        // do nothing
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
new file mode 100644
index 0000000000..2cce57be93
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
+
+import org.apache.paimon.table.source.TableScan;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+
+import java.util.Deque;
+
+/** Paimon source split enumerator, used to calculate the splits for every 
reader. */
+@Slf4j
+public class PaimonStreamSourceSplitEnumerator extends AbstractSplitEnumerator 
{
+
+    public PaimonStreamSourceSplitEnumerator(
+            Context<PaimonSourceSplit> context,
+            Deque<PaimonSourceSplit> pendingSplits,
+            @Nullable Long nextSnapshotId,
+            TableScan tableScan,
+            int splitMaxPerTask) {
+        super(context, pendingSplits, nextSnapshotId, tableScan, 
splitMaxPerTask);
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        readersAwaitingSplit.add(subtaskId);
+        assignSplits();
+        if (readersAwaitingSplit.contains(subtaskId)) {
+            loadNewSplits();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
index adb77c637d..4a3833e6a0 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
@@ -19,12 +19,11 @@ package 
org.apache.seatunnel.connectors.seatunnel.paimon.utils;
 
 import org.apache.seatunnel.api.table.type.RowKind;
 
-import org.apache.paimon.data.InternalRow;
-
 public class RowKindConverter {
 
     /**
-     * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link 
InternalRow}
+     * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link
+     * org.apache.paimon.types.RowKind}
      *
      * @param seaTunnelRowKind The kind of change that a row describes in a 
changelog.
      * @return
@@ -44,4 +43,27 @@ public class RowKindConverter {
                 return null;
         }
     }
+
+    /**
+     * Convert Paimon RowKind {@link org.apache.paimon.types.RowKind} to 
SeaTunnel RowKind {@link
+     * RowKind}
+     *
+     * @param paimonRowKind
+     * @return
+     */
+    public static RowKind convertPaimonRowKind2SeatunnelRowkind(
+            org.apache.paimon.types.RowKind paimonRowKind) {
+        switch (paimonRowKind) {
+            case DELETE:
+                return RowKind.DELETE;
+            case UPDATE_AFTER:
+                return RowKind.UPDATE_AFTER;
+            case UPDATE_BEFORE:
+                return RowKind.UPDATE_BEFORE;
+            case INSERT:
+                return RowKind.INSERT;
+            default:
+                return null;
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
new file mode 100644
index 0000000000..7da7afaa0b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class PaimonRecordWithFullType {
+    public Map c_map;
+    public int[] c_array;
+    public BinaryString c_string;
+    public boolean c_boolean;
+    public short c_tinyint;
+    public short c_smallint;
+    public int c_int;
+    public long c_bigint;
+    public float c_float;
+    public double c_double;
+    public Decimal c_decimal;
+    public BinaryString c_bytes;
+    public int c_date;
+    public Timestamp c_timestamp;
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 0168cc8f53..dc6bfc9eba 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -637,7 +637,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
         return result;
     }
 
-    private Table getTable(String dbName, String tbName) {
+    protected Table getTable(String dbName, String tbName) {
         try {
             return getCatalog().getTable(getIdentifier(dbName, tbName));
         } catch (Catalog.TableNotExistException e) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
new file mode 100644
index 0000000000..ede9f7c3b3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason =
+                "Spark and Flink engine can not auto create paimon table on 
worker node in local file(e.g flink tm) by savemode feature which can lead 
error")
+@Slf4j
+public class PaimonStreamReadIT extends PaimonSinkCDCIT {
+
+    @TestTemplate
+    public void testStreamReadPaimon(TestContainer container) throws Exception 
{
+        Container.ExecResult writeResult =
+                container.executeJob("/fake_to_paimon_with_full_type.conf");
+        Assertions.assertEquals(0, writeResult.getExitCode());
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob("/paimon_to_paimon.conf");
+                    } catch (Exception e) {
+                        throw new SeaTunnelException(e);
+                    }
+                });
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(400L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            List<PaimonRecordWithFullType> paimonSourceRecords 
=
+                                    loadPaimonDataWithFullType("full_type", 
"st_test");
+                            List<PaimonRecordWithFullType> paimonSinkRecords =
+                                    loadPaimonDataWithFullType("full_type", 
"st_test_sink");
+                            Assertions.assertEquals(
+                                    paimonSourceRecords.size(), 
paimonSinkRecords.size());
+                            
Assertions.assertIterableEquals(paimonSourceRecords, paimonSinkRecords);
+                        });
+
+        // write cdc data
+        Container.ExecResult writeResult1 =
+                
container.executeJob("/fake_to_paimon_with_full_type_cdc_data.conf");
+        Assertions.assertEquals(0, writeResult1.getExitCode());
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(400L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            List<PaimonRecordWithFullType> paimonSourceRecords 
=
+                                    loadPaimonDataWithFullType("full_type", 
"st_test");
+                            List<PaimonRecordWithFullType> paimonSinkRecords =
+                                    loadPaimonDataWithFullType("full_type", 
"st_test_sink");
+                            Assertions.assertEquals(
+                                    paimonSourceRecords.size(), 
paimonSinkRecords.size());
+                            
Assertions.assertIterableEquals(paimonSourceRecords, paimonSinkRecords);
+                        });
+    }
+
+    protected List<PaimonRecordWithFullType> loadPaimonDataWithFullType(
+            String dbName, String tbName) {
+        FileStoreTable table = (FileStoreTable) getTable(dbName, tbName);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        TableRead tableRead = readBuilder.newRead();
+        List<PaimonRecordWithFullType> result = new ArrayList<>();
+        try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
+            reader.forEachRemaining(
+                    row -> {
+                        InternalMap internalMap = row.getMap(0);
+                        InternalArray keyArray = internalMap.keyArray();
+                        InternalArray valueArray = internalMap.valueArray();
+                        HashMap<Object, Object> map = new 
HashMap<>(internalMap.size());
+                        for (int i = 0; i < internalMap.size(); i++) {
+                            map.put(keyArray.getString(i), 
valueArray.getString(i));
+                        }
+                        InternalArray internalArray = row.getArray(1);
+                        int[] intArray = internalArray.toIntArray();
+                        PaimonRecordWithFullType paimonRecordWithFullType =
+                                new PaimonRecordWithFullType(
+                                        map,
+                                        intArray,
+                                        row.getString(2),
+                                        row.getBoolean(3),
+                                        row.getShort(4),
+                                        row.getShort(5),
+                                        row.getInt(6),
+                                        row.getLong(7),
+                                        row.getFloat(8),
+                                        row.getDouble(9),
+                                        row.getDecimal(10, 30, 8),
+                                        row.getString(11),
+                                        row.getInt(12),
+                                        row.getTimestamp(13, 6));
+                        result.add(paimonRecordWithFullType);
+                    });
+        } catch (IOException e) {
+            throw new SeaTunnelException(e);
+        }
+        return result;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
new file mode 100644
index 0000000000..c5b881c2ce
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+      primaryKey {
+        name = "c_tinyint"
+        columnNames = [c_tinyint]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [{"a": "b"}, [101], "c_string", true, 121, 15987, 563873951, 
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", 
"bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "b"}, [101], "c_string1", true, 122, 15987, 563873952, 
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", 
"bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = [{"a": "c"}, [102], "c_string2", true, 117, 15987, 563873953, 
7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", 
"bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = [{"a": "e"}, [103], "c_string3", false, 117, 15989, 
563873951, 7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", 
"bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"]
+      }
+      {
+        kind = DELETE
+        fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953, 
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", 
"bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"]
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
new file mode 100644
index 0000000000..50728871af
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "Streaming"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test_sink"
+    paimon.table.primary-keys = "c_tinyint"
+  }
+}

Reply via email to