This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a18cd7f7 [FLINK-38478][pipeline-connector][paimon] Generate a random 
commit user and store it in state. (#4149)
6a18cd7f7 is described below

commit 6a18cd7f7fe5de9c1a79587f9f0af923f3146fd3
Author: Kunni <[email protected]>
AuthorDate: Fri Oct 31 14:26:56 2025 +0800

    [FLINK-38478][pipeline-connector][paimon] Generate a random commit user and 
store it in state. (#4149)
---
 .../docs/connectors/pipeline-connectors/paimon.md  |   2 +-
 .../docs/connectors/pipeline-connectors/paimon.md  |   2 +-
 .../paimon/sink/PaimonDataSinkOptions.java         |   3 +-
 .../cdc/connectors/paimon/sink/v2/PaimonSink.java  |  40 +-
 .../connectors/paimon/sink/v2/PaimonWriter.java    |  17 +-
 .../paimon/sink/v2/PaimonWriterState.java          |  49 ++
 .../sink/v2/PaimonWriterStateSerializer.java       |  58 +++
 .../migration/MySqlToPaimonMigrationITCase.java    | 503 +++++++++++++++++++++
 .../tests/migration/YamlJobMigrationITCase.java    |   2 +-
 .../tests/utils/PipelineTestEnvironment.java       |   5 +-
 .../cdc/pipeline/tests/utils/TarballFetcher.java   |   2 +
 11 files changed, 672 insertions(+), 11 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/paimon.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/paimon.md
index 912f96749..6657cbf54 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/paimon.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/paimon.md
@@ -110,7 +110,7 @@ Pipeline 连接器配置项
       <td>Hive metastore 的 uri,在 metastore 设置为 hive 的时候需要。</td>
     </tr>
     <tr>
-      <td>commit.user</td>
+      <td>commit.user-prefix</td>
       <td>optional</td>
       <td style="word-wrap: break-word;"><code>admin</code></td>
       <td>String</td>
diff --git a/docs/content/docs/connectors/pipeline-connectors/paimon.md 
b/docs/content/docs/connectors/pipeline-connectors/paimon.md
index bd9aedb9d..19efb8501 100644
--- a/docs/content/docs/connectors/pipeline-connectors/paimon.md
+++ b/docs/content/docs/connectors/pipeline-connectors/paimon.md
@@ -110,7 +110,7 @@ Pipeline Connector Options
       <td>Uri of metastore server.</td>
     </tr>
     <tr>
-      <td>commit.user</td>
+      <td>commit.user-prefix</td>
       <td>optional</td>
       <td style="word-wrap: break-word;"><code>"admin"</code></td>
       <td>String</td>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java
index 5f2712a93..bcd834d2f 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java
@@ -33,9 +33,10 @@ public class PaimonDataSinkOptions {
     public static final String PREFIX_CATALOG_PROPERTIES = 
"catalog.properties.";
 
     public static final ConfigOption<String> COMMIT_USER =
-            key("commit.user")
+            key("commit.user-prefix")
                     .stringType()
                     .defaultValue("admin")
+                    .withFallbackKeys("commit.user")
                     .withDescription("User name for committing data files.");
 
     public static final ConfigOption<String> WAREHOUSE =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
index 86622f611..ca249c20c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java
@@ -20,6 +20,10 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -33,30 +37,35 @@ import 
org.apache.paimon.flink.sink.MultiTableCommittableSerializer;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 
+import java.util.Collection;
+import java.util.UUID;
+
 /**
  * A {@link Sink} for Paimon. Maintain this package until Paimon has it own 
sinkV2 implementation.
  */
-public class PaimonSink<InputT> implements WithPreCommitTopology<InputT, 
MultiTableCommittable> {
+public class PaimonSink<InputT>
+        implements WithPreCommitTopology<InputT, MultiTableCommittable>,
+                SupportsWriterState<InputT, PaimonWriterState> {
 
     // provided a default commit user.
     public static final String DEFAULT_COMMIT_USER = "admin";
 
     protected final Options catalogOptions;
 
+    /** The commitUser should be restored in state and restore it in writer. */
     protected final String commitUser;
 
     private final PaimonRecordSerializer<InputT> serializer;
 
     public PaimonSink(Options catalogOptions, PaimonRecordSerializer<InputT> 
serializer) {
-        this.catalogOptions = catalogOptions;
-        this.serializer = serializer;
-        commitUser = DEFAULT_COMMIT_USER;
+        this(catalogOptions, DEFAULT_COMMIT_USER, serializer);
     }
 
     public PaimonSink(
             Options catalogOptions, String commitUser, 
PaimonRecordSerializer<InputT> serializer) {
         this.catalogOptions = catalogOptions;
-        this.commitUser = commitUser;
+        // generate a random commit user to avoid conflict.
+        this.commitUser = commitUser + UUID.randomUUID();
         this.serializer = serializer;
     }
 
@@ -69,6 +78,22 @@ public class PaimonSink<InputT> implements 
WithPreCommitTopology<InputT, MultiTa
                 catalogOptions, context.metricGroup(), commitUser, serializer, 
lastCheckpointId);
     }
 
+    @Override
+    public StatefulSinkWriter<InputT, PaimonWriterState> restoreWriter(
+            WriterInitContext context, Collection<PaimonWriterState> 
paimonWriterStates) {
+        long lastCheckpointId =
+                context.getRestoredCheckpointId()
+                        .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+        Preconditions.checkNotNull(paimonWriterStates);
+        String storedCommitUser = 
paimonWriterStates.iterator().next().getCommitUser();
+        return new PaimonWriter<>(
+                catalogOptions,
+                context.metricGroup(),
+                storedCommitUser,
+                serializer,
+                lastCheckpointId);
+    }
+
     @Override
     public Committer<MultiTableCommittable> createCommitter() {
         return new PaimonCommitter(catalogOptions, commitUser);
@@ -100,4 +125,9 @@ public class PaimonSink<InputT> implements 
WithPreCommitTopology<InputT, MultiTa
                         new PreCommitOperator(catalogOptions, commitUser))
                 .setParallelism(committables.getParallelism());
     }
+
+    @Override
+    public SimpleVersionedSerializer<PaimonWriterState> 
getWriterStateSerializer() {
+        return PaimonWriterStateSerializer.INSTANCE;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index bf660454e..e5821135d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.connectors.paimon.sink.v2;
 
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.metrics.MetricGroup;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +53,8 @@ import java.util.stream.Collectors;
 
 /** A {@link Sink} to write {@link DataChangeEvent} to Paimon storage. */
 public class PaimonWriter<InputT>
-        implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, 
MultiTableCommittable> {
+        implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, 
MultiTableCommittable>,
+                StatefulSinkWriter<InputT, PaimonWriterState> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PaimonWriter.class);
 
@@ -75,6 +78,8 @@ public class PaimonWriter<InputT>
     /** A workaround variable trace the checkpointId in {@link 
StreamOperator#snapshotState}. */
     private long lastCheckpointId;
 
+    private final PaimonWriterState stateCache;
+
     public PaimonWriter(
             Options catalogOptions,
             MetricGroup metricGroup,
@@ -93,6 +98,11 @@ public class PaimonWriter<InputT>
                                 Thread.currentThread().getName() + 
"-CdcMultiWrite-Compaction"));
         this.serializer = serializer;
         this.lastCheckpointId = lastCheckpointId;
+        this.stateCache = new PaimonWriterState(commitUser);
+        LOG.info(
+                "Created PaimonWriter with commit user {} and identifier {}",
+                commitUser,
+                lastCheckpointId);
     }
 
     @Override
@@ -214,4 +224,9 @@ public class PaimonWriter<InputT>
             compactExecutor.shutdownNow();
         }
     }
+
+    @Override
+    public List<PaimonWriterState> snapshotState(long checkpointId) {
+        return Collections.singletonList(stateCache);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java
new file mode 100644
index 000000000..54468ec14
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+/** The state of {@link PaimonWriter}. */
+public class PaimonWriterState {
+
+    public static final int VERSION = 0;
+
+    /**
+     * The commit user of {@link PaimonWriter}.
+     *
+     * <p>Note: Introduced from version 0.
+     */
+    private final String commitUser;
+
+    private transient byte[] serializedBytesCache;
+
+    public PaimonWriterState(String commitUser) {
+        this.commitUser = commitUser;
+    }
+
+    public String getCommitUser() {
+        return commitUser;
+    }
+
+    public byte[] getSerializedBytesCache() {
+        return serializedBytesCache;
+    }
+
+    public void setSerializedBytesCache(byte[] serializedBytesCache) {
+        this.serializedBytesCache = serializedBytesCache;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java
new file mode 100644
index 000000000..317d7d651
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+
+/** A {@link SimpleVersionedSerializer} for {@link PaimonWriterState}. */
+public class PaimonWriterStateSerializer implements 
SimpleVersionedSerializer<PaimonWriterState> {
+
+    public static final PaimonWriterStateSerializer INSTANCE = new 
PaimonWriterStateSerializer();
+
+    @Override
+    public int getVersion() {
+        return PaimonWriterState.VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PaimonWriterState paimonWriterState) throws 
IOException {
+        if (paimonWriterState.getSerializedBytesCache() != null) {
+            return paimonWriterState.getSerializedBytesCache();
+        } else {
+            final DataOutputSerializer out = new DataOutputSerializer(64);
+            out.writeUTF(paimonWriterState.getCommitUser());
+            byte[] serializedBytesCache = out.getCopyOfBuffer();
+            paimonWriterState.setSerializedBytesCache(serializedBytesCache);
+            return serializedBytesCache;
+        }
+    }
+
+    @Override
+    public PaimonWriterState deserialize(int version, byte[] serialized) 
throws IOException {
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);
+        if (version == 0) {
+            String commitUser = in.readUTF();
+            return new PaimonWriterState(commitUser);
+        }
+        throw new IOException("Unknown version: " + version);
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/MySqlToPaimonMigrationITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/MySqlToPaimonMigrationITCase.java
new file mode 100644
index 000000000..bcedcf7b0
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/MySqlToPaimonMigrationITCase.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.migration;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.apache.flink.cdc.pipeline.tests.utils.TarballFetcher;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.MountableFile;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * E2e cases for stopping & restarting jobs of `MySQL source to Paimon sink` 
from previous state.
+ */
+class MySqlToPaimonMigrationITCase extends PipelineTestEnvironment {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToPaimonMigrationITCase.class);
+
+    private static final Duration PAIMON_TESTCASE_TIMEOUT = 
Duration.ofMinutes(3);
+
+    protected UniqueDatabase mysqlInventoryDatabase;
+    private final Function<String, String> dbNameFormatter =
+            (s) -> String.format(s, mysqlInventoryDatabase.getDatabaseName());
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        mysqlInventoryDatabase =
+                new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+        mysqlInventoryDatabase.createAndInitialize();
+        jobManager.copyFileToContainer(
+                MountableFile.forHostPath(
+                        
TestUtils.getResource(getPaimonSQLConnectorResourceName())),
+                sharedVolume.toString() + "/" + 
getPaimonSQLConnectorResourceName());
+        jobManager.copyFileToContainer(
+                
MountableFile.forHostPath(TestUtils.getResource("flink-shade-hadoop.jar")),
+                sharedVolume.toString() + "/flink-shade-hadoop.jar");
+    }
+
+    @AfterEach
+    public void after() {
+        super.after();
+        if (mysqlInventoryDatabase != null) {
+            mysqlInventoryDatabase.dropDatabase();
+        }
+    }
+
+    @Test
+    void testBasicJobSubmitting() throws Exception {
+        String warehouse = sharedVolume.toString() + "/" + "paimon_" + 
UUID.randomUUID();
+        String content =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: %d\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.products\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: paimon\n"
+                                + "  catalog.properties.warehouse: %s\n"
+                                + "  catalog.properties.metastore: 
filesystem\n"
+                                + "  catalog.properties.cache-enabled: false\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d\n",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MySqlContainer.MYSQL_PORT,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        warehouse,
+                        4);
+        Path paimonCdcConnector = 
TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
+        Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
+        JobID jobID = submitPipelineJob(content, paimonCdcConnector, 
hadoopJar);
+        Assertions.assertThat(jobID).isNotNull();
+        LOG.info("Submitted Job ID is {} ", jobID);
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+                        "106, hammer, 16oz carpenter's hammer, 1.0, null, 
null, null",
+                        "107, rocks, box of assorted rocks, 5.3, null, null, 
null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null"));
+        LOG.info("Snapshot phase successfully finished.");
+
+        waitUntilJobFinished(Duration.ofSeconds(30));
+        LOG.info("Job gracefully stopped.");
+    }
+
+    @ParameterizedTest(name = "{0} -> SNAPSHOT")
+    @EnumSource(names = {"V3_2_1", "V3_3_0", "V3_4_0", "V3_5_0", "SNAPSHOT"})
+    void testStartingJobFromSavepoint(TarballFetcher.CdcVersion 
migrateFromVersion)
+            throws Exception {
+        TarballFetcher.fetch(jobManager, migrateFromVersion);
+        LOG.info("Successfully fetched CDC {}.", migrateFromVersion);
+
+        String warehouse = sharedVolume.toString() + "/" + "paimon_" + 
UUID.randomUUID();
+        String content =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: %d\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.products\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: paimon\n"
+                                + "  catalog.properties.warehouse: %s\n"
+                                + "  catalog.properties.metastore: 
filesystem\n"
+                                + "  catalog.properties.cache-enabled: false\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d\n",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MySqlContainer.MYSQL_PORT,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        warehouse,
+                        4);
+        Path paimonCdcConnector = 
TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
+        Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
+        JobID jobID = submitPipelineJob(content, paimonCdcConnector, 
hadoopJar);
+        Assertions.assertThat(jobID).isNotNull();
+
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+                        "106, hammer, 16oz carpenter's hammer, 1.0, null, 
null, null",
+                        "107, rocks, box of assorted rocks, 5.3, null, null, 
null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null"));
+        LOG.info("Snapshot stage finished successfully.");
+
+        generateIncrementalEventsPhaseOne();
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+                        "106, hammer, 18oz carpenter hammer, 1.0, null, null, 
null",
+                        "107, rocks, box of assorted rocks, 5.1, null, null, 
null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null"));
+        LOG.info("Incremental stage 1 finished successfully.");
+
+        generateIncrementalEventsPhaseTwo();
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, null",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, null",
+                        "106, hammer, 18oz carpenter hammer, 1.0, null, null, 
null, null",
+                        "107, rocks, box of assorted rocks, 5.1, null, null, 
null, null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null, null",
+                        "110, jacket, water resistent white wind breaker, 0.2, 
null, null, null, 1",
+                        "111, scooter, Big 2-wheel scooter, 5.18, null, null, 
null, 1"));
+        LOG.info("Incremental stage 2 finished successfully.");
+
+        String savepointPath = stopJobWithSavepoint(jobID);
+        LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, 
savepointPath);
+
+        JobID newJobID =
+                submitPipelineJob(content, savepointPath, true, 
paimonCdcConnector, hadoopJar);
+        LOG.info("Reincarnated Job {} has been submitted successfully.", 
newJobID);
+
+        generateIncrementalEventsPhaseThree();
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, null",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, null",
+                        "106, hammer, 18oz carpenter hammer, 1.0, null, null, 
null, null",
+                        "107, rocks, box of assorted rocks, 5.1, null, null, 
null, null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null, null",
+                        "110, jacket, new water resistent white wind breaker, 
0.5, null, null, null, 1"));
+        cancelJob(newJobID);
+    }
+
+    @ParameterizedTest(name = "{0} -> SNAPSHOT")
+    @EnumSource(names = {"SNAPSHOT"})
+    void 
testStartingJobFromSavepointWithSchemaChange(TarballFetcher.CdcVersion 
migrateFromVersion)
+            throws Exception {
+        TarballFetcher.fetch(jobManager, migrateFromVersion);
+        runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/");
+
+        LOG.info("Successfully fetched CDC {}.", migrateFromVersion);
+
+        String warehouse = sharedVolume.toString() + "/" + "paimon_" + 
UUID.randomUUID();
+        String content =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: %d\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.products\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: paimon\n"
+                                + "  catalog.properties.warehouse: %s\n"
+                                + "  catalog.properties.metastore: 
filesystem\n"
+                                + "  catalog.properties.cache-enabled: false\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d\n",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MySqlContainer.MYSQL_PORT,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        warehouse,
+                        4);
+        Path paimonCdcConnector = 
TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
+        Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
+        JobID jobID = submitPipelineJob(migrateFromVersion, content, 
paimonCdcConnector, hadoopJar);
+        Assertions.assertThat(jobID).isNotNull();
+        LOG.info("Submitted Job ID is {} ", jobID);
+
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+                        "106, hammer, 16oz carpenter's hammer, 1.0, null, 
null, null",
+                        "107, rocks, box of assorted rocks, 5.3, null, null, 
null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null"));
+        LOG.info("Snapshot stage finished successfully.");
+
+        generateIncrementalEventsPhaseOne();
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+                        "106, hammer, 18oz carpenter hammer, 1.0, null, null, 
null",
+                        "107, rocks, box of assorted rocks, 5.1, null, null, 
null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null"));
+        LOG.info("Incremental stage 1 finished successfully.");
+
+        String savepointPath = stopJobWithSavepoint(jobID);
+        LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, 
savepointPath);
+        // Modify schema and make some data changes.
+        generateIncrementalEventsPhaseTwo();
+        JobID newJobID =
+                submitPipelineJob(content, savepointPath, true, 
paimonCdcConnector, hadoopJar);
+        LOG.info("Reincarnated Job {} has been submitted successfully.", 
newJobID);
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, null",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, null",
+                        "106, hammer, 18oz carpenter hammer, 1.0, null, null, 
null, null",
+                        "107, rocks, box of assorted rocks, 5.1, null, null, 
null, null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null, null",
+                        "110, jacket, water resistent white wind breaker, 0.2, 
null, null, null, 1",
+                        "111, scooter, Big 2-wheel scooter, 5.18, null, null, 
null, 1"));
+        LOG.info("Incremental stage 2 finished successfully.");
+
+        generateIncrementalEventsPhaseThree();
+        validateSinkResult(
+                warehouse,
+                mysqlInventoryDatabase.getDatabaseName(),
+                "products",
+                Arrays.asList(
+                        "101, scooter, Small 2-wheel scooter, 3.14, red, 
{\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "102, car battery, 12V car battery, 8.1, white, 
{\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "103, 12-pack drill bits, 12-pack of drill bits with 
sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, null",
+                        "104, hammer, 12oz carpenter's hammer, 0.75, white, 
{\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, 
null",
+                        "105, hammer, 14oz carpenter's hammer, 0.875, red, 
{\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, null",
+                        "106, hammer, 18oz carpenter hammer, 1.0, null, null, 
null, null",
+                        "107, rocks, box of assorted rocks, 5.1, null, null, 
null, null",
+                        "108, jacket, water resistent black wind breaker, 0.1, 
null, null, null, null",
+                        "109, spare tire, 24 inch spare tire, 22.2, null, 
null, null, null",
+                        "110, jacket, new water resistent white wind breaker, 
0.5, null, null, null, 1"));
+        cancelJob(newJobID);
+    }
+
+    private void generateIncrementalEventsPhaseOne() {
+        executeMySqlStatements(
+                mysqlInventoryDatabase,
+                "UPDATE products SET description='18oz carpenter hammer' WHERE 
id=106;",
+                "UPDATE products SET weight='5.1' WHERE id=107;");
+    }
+
+    private void generateIncrementalEventsPhaseTwo() {
+        executeMySqlStatements(
+                mysqlInventoryDatabase,
+                "UPDATE products SET description='18oz carpenter hammer' WHERE 
id=106;",
+                "UPDATE products SET weight='5.1' WHERE id=107;",
+                "ALTER TABLE products ADD COLUMN new_col INT;",
+                "INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null, 1);",
+                "INSERT INTO products VALUES (default,'scooter','Big 2-wheel 
scooter ',5.18, null, null, null, 1);");
+    }
+
+    private void generateIncrementalEventsPhaseThree() {
+        executeMySqlStatements(
+                mysqlInventoryDatabase,
+                "UPDATE products SET description='new water resistent white 
wind breaker', weight='0.5' WHERE id=110;",
+                "UPDATE products SET weight='5.17' WHERE id=111;",
+                "DELETE FROM products WHERE id=111;");
+    }
+
+    private void executeMySqlStatements(UniqueDatabase database, String... 
statements) {
+        String mysqlJdbcUrl =
+                String.format(
+                        "jdbc:mysql://%s:%s/%s",
+                        MYSQL.getHost(), MYSQL.getDatabasePort(), 
database.getDatabaseName());
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                mysqlJdbcUrl, MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+                Statement stat = conn.createStatement()) {
+            for (String sql : statements) {
+                try {
+                    stat.execute(sql);
+                } catch (SQLException e) {
+                    throw new RuntimeException("Failed to execute SQL 
statement " + sql, e);
+                }
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to execute MySQL statements.", 
e);
+        }
+    }
+
+    private void validateSinkResult(
+            String warehouse, String database, String table, List<String> 
expected)
+            throws InterruptedException {
+        LOG.info("Verifying Paimon {}::{}::{} results...", warehouse, 
database, table);
+        long deadline = System.currentTimeMillis() + 
PAIMON_TESTCASE_TIMEOUT.toMillis();
+        List<String> results = Collections.emptyList();
+        while (System.currentTimeMillis() < deadline) {
+            try {
+                results = fetchPaimonTableRows(warehouse, database, table);
+                
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
+                LOG.info(
+                        "Successfully verified {} records in {} seconds.",
+                        expected.size(),
+                        (System.currentTimeMillis() - deadline + 
PAIMON_TESTCASE_TIMEOUT.toMillis())
+                                / 1000);
+                return;
+            } catch (Exception e) {
+                LOG.warn("Validate failed, waiting for the next loop...", e);
+            } catch (AssertionError ignored) {
+                // AssertionError contains way too much records and might 
flood the log output.
+                LOG.warn(
+                        "Results mismatch, expected {} records, but got {} 
actually. Waiting for the next loop...",
+                        expected.size(),
+                        results.size());
+            }
+            Thread.sleep(1000L);
+        }
+        
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    private List<String> fetchPaimonTableRows(String warehouse, String 
database, String table)
+            throws Exception {
+        String template =
+                readLines("docker/peek-paimon.sql").stream()
+                        .filter(line -> !line.startsWith("--"))
+                        .collect(Collectors.joining("\n"));
+        String sql = String.format(template, warehouse, database, table);
+        String containerSqlPath = sharedVolume.toString() + "/peek.sql";
+        jobManager.copyFileToContainer(Transferable.of(sql), containerSqlPath);
+
+        Container.ExecResult result =
+                jobManager.execInContainer(
+                        "/opt/flink/bin/sql-client.sh",
+                        "--jar",
+                        sharedVolume.toString() + "/" + 
getPaimonSQLConnectorResourceName(),
+                        "--jar",
+                        sharedVolume.toString() + "/flink-shade-hadoop.jar",
+                        "-f",
+                        containerSqlPath);
+        if (result.getExitCode() != 0) {
+            throw new RuntimeException(
+                    "Failed to execute peek script. Stdout: "
+                            + result.getStdout()
+                            + "; Stderr: "
+                            + result.getStderr());
+        }
+
+        return Arrays.stream(result.getStdout().split("\n"))
+                .filter(line -> line.startsWith("|"))
+                .skip(1)
+                .map(MySqlToPaimonMigrationITCase::extractRow)
+                .map(row -> String.format("%s", String.join(", ", row)))
+                .collect(Collectors.toList());
+    }
+
+    private static String[] extractRow(String row) {
+        return Arrays.stream(row.split("\\|"))
+                .map(String::trim)
+                .filter(col -> !col.isEmpty())
+                .map(col -> col.equals("<NULL>") ? "null" : col)
+                .toArray(String[]::new);
+    }
+
+    protected String getPaimonSQLConnectorResourceName() {
+        return String.format("paimon-sql-connector-%s.jar", flinkVersion);
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
index 6c929ee28..2a875d9f7 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
@@ -99,7 +99,7 @@ class YamlJobMigrationITCase extends PipelineTestEnvironment {
     }
 
     @ParameterizedTest(name = "{0} -> SNAPSHOT")
-    @EnumSource(names = {"V3_2_1", "V3_3_0", "SNAPSHOT"})
+    @EnumSource(names = {"V3_2_1", "V3_3_0", "V3_4_0", "V3_5_0", "SNAPSHOT"})
     void testStartingJobFromSavepoint(TarballFetcher.CdcVersion 
migrateFromVersion)
             throws Exception {
         TarballFetcher.fetch(jobManager, migrateFromVersion);
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 6e8bac917..914278bcb 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -142,7 +142,10 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
                     "restart-strategy.type: off",
                     // Set off-heap memory explicitly to avoid 
"java.lang.OutOfMemoryError: Direct
                     // buffer memory" error.
-                    "taskmanager.memory.task.off-heap.size: 128mb");
+                    "taskmanager.memory.task.off-heap.size: 128mb",
+                    // Fix `java.lang.OutOfMemoryError: Metaspace. The 
metaspace out-of-memory error
+                    // has occurred` error.
+                    "taskmanager.memory.jvm-metaspace.size: 512mb");
     public static final String FLINK_PROPERTIES = String.join("\n", 
EXTERNAL_PROPS);
 
     @Nullable protected RestClusterClient<StandaloneClusterId> 
restClusterClient;
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
index 573a4797c..1d5788918 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java
@@ -115,6 +115,8 @@ public abstract class TarballFetcher {
         V3_2_0("3.2.0"),
         V3_2_1("3.2.1"),
         V3_3_0("3.3.0"),
+        V3_4_0("3.4.0"),
+        V3_5_0("3.5.0"),
         SNAPSHOT("SNAPSHOT");
 
         private final String version;

Reply via email to