yzeng1618 commented on code in PR #9576:
URL: https://github.com/apache/seatunnel/pull/9576#discussion_r2300762811


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_custom_sql.conf:
##########
@@ -26,6 +26,10 @@ source{
     row.num = 100
     split.num = 10
     string.length = 1
+    auto.increment.enabled = true
+    auto.increment.start = 2
+    bigint.min = 2
+    bigint.max = 1000

Review Comment:
   These modifications are intended to prevent errors caused by configuration 
issues while ensuring the functionality of the original end-to-end (e2e) tests.



##########
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java:
##########
@@ -108,8 +108,8 @@ public void pollNext(Collector<SeaTunnelRow> output) throws 
Exception {
             } else {
                 context.sendSplitRequest();
                 if (sourceSplits.isEmpty()) {
-                    log.debug("Waiting for table source split, sleeping 1s");
-                    Thread.sleep(1000L);
+                    log.debug("Waiting for table source split, sleeping 
100ms");
+                    Thread.sleep(100L);

Review Comment:
   There were some modifications here because there were issues during the CI 
process at that time, which led to some optimizations. However, no retries were 
performed. Now, I will restore some of the content.



##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapper.java:
##########
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+/**
+ * The {@link org.apache.seatunnel.api.source.SourceSplit} wrapper, used for 
proxy all seatunnel
+ * user-defined source split in flink engine.
+ *
+ * @param <T> The generic type of source split
+ */
+public class SplitWrapper<T extends 
org.apache.seatunnel.api.source.SourceSplit>

Review Comment:
   The content in the source section has been removed.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/fake_to_sftp_file_text.conf:
##########
@@ -82,6 +82,6 @@ sink {
     file_format_type = "text"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
-    compress_codec = "lzo"
+    compress_codec = "none"
   }

Review Comment:
   These modifications are intended to prevent errors caused by configuration 
issues while ensuring the functionality of the original end-to-end (e2e) tests.



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractSinkExecuteProcessor.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.execution;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static 
org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_NAME;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
+
+/** Abstract base class for Sink execute processors. */
+@Slf4j

Review Comment:
   > as above
   The logging style throughout the /seatunnel-core/seatunnel-flink-starter 
directory shall be unified to prioritize LoggerFactory, and LOGGER shall only 
be declared in classes that require log output.
   



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink20Container.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.flink;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This class is the base class of FlinkEnvironment test for new seatunnel 
connector API. The before
+ * method will create a Flink cluster, and after method will close the Flink 
cluster. You can use
+ * {@link Flink20Container#executeJob} to submit a seatunnel config and run a 
seatunnel job.
+ */
+@NoArgsConstructor
+@AutoService(TestContainer.class)
+public class Flink20Container extends AbstractTestFlinkContainer {
+
+    @Override
+    public TestContainerId identifier() {
+        return TestContainerId.FLINK_1_20;
+    }
+
+    @Override
+    protected String getDockerImage() {
+        return "tyrantlucifer/flink:1.20.1-scala_2.12_hadoop27";
+    }
+
+    @Override
+    protected String getStartModuleName() {
+        return "seatunnel-flink-starter" + File.separator + 
"seatunnel-flink-20-starter";
+    }
+
+    @Override
+    protected String getStartShellName() {
+        return "start-seatunnel-flink-20-connector-v2.sh";
+    }
+
+    @Override
+    protected String getConnectorType() {
+        return "seatunnel";
+    }
+
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors-v2";
+    }
+
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "connector-";
+    }
+
+    @Override
+    protected List<String> getFlinkProperties() {
+        // CRITICAL: For Flink 1.20.1, we need to completely replace the 
config file
+        // instead of appending to it, because SnakeYAML requires the entire 
file
+        // to start with a YAML document marker.
+        //
+        // We use a special marker that will be processed by our custom 
startup script
+
+        List<String> properties =
+                Arrays.asList(
+                        "# SEATUNNEL_FLINK20_CONFIG_REPLACE_START",
+                        "---", // YAML document start required by SnakeYAML 
engine
+                        "# SeaTunnel Flink 1.20.1 Complete Configuration",
+                        "# Generated to ensure YAML compliance with SnakeYAML 
engine",
+                        "",
+                        "# Memory Configuration",
+                        "jobmanager.memory.process.size: 1600m",
+                        "taskmanager.memory.process.size: 1728m",
+                        "taskmanager.memory.flink.size: 1280m",
+                        "",
+                        "# Network Buffer Configuration - Fix for insufficient 
network buffers",
+                        "taskmanager.memory.network.fraction: 0.2",
+                        "taskmanager.memory.network.min: 128mb",
+                        "taskmanager.memory.network.max: 512mb",
+                        "",
+                        "# Network Configuration",
+                        "jobmanager.rpc.address: jobmanager",
+                        "taskmanager.numberOfTaskSlots: 10",
+                        "",
+                        "# Execution Configuration",
+                        "parallelism.default: 4",
+                        "",
+                        "# JVM Configuration",
+                        "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false",
+                        "# SEATUNNEL_FLINK20_CONFIG_REPLACE_END");
+
+        // Debug logging
+        System.out.println("=== Flink20Container Debug Information ===");
+        System.out.println("Docker Image: " + getDockerImage());
+        System.out.println(
+                "Using config replacement mode for Flink 1.20.1 SnakeYAML 
compatibility");
+        String joinedProperties = String.join("\n", properties);
+        System.out.println("Final FLINK_PROPERTIES environment variable 
content:");
+        System.out.println("--- START FLINK_PROPERTIES ---");
+        System.out.println(joinedProperties);
+        System.out.println("--- END FLINK_PROPERTIES ---");
+        System.out.println("=== End Debug Information ===");
+
+        return properties;
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        // Override startup to handle Flink 1.20.1 specific YAML configuration 
requirements
+        final String dockerImage = getDockerImage();
+        final String properties = String.join("\n", getFlinkProperties());
+
+        System.out.println("=== Flink20Container Custom Startup ===");
+        System.out.println("Starting Flink 1.20.1 with custom configuration 
handling");
+
+        jobManager =
+                new 
org.testcontainers.containers.GenericContainer<>(dockerImage)
+                        .withCommand("sh", "-c", 
createJobManagerStartupCommand())
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases("jobmanager")
+                        .withExposedPorts()
+                        .withEnv("FLINK_PROPERTIES", properties)
+                        .withLogConsumer(
+                                new 
org.testcontainers.containers.output.Slf4jLogConsumer(
+                                        
org.testcontainers.utility.DockerLoggerFactory.getLogger(
+                                                dockerImage + ":jobmanager")))
+                        .waitingFor(
+                                new org.testcontainers.containers.wait.strategy
+                                                .LogMessageWaitStrategy()
+                                        .withRegEx(".*Starting the resource 
manager.*")
+                                        
.withStartupTimeout(java.time.Duration.ofMinutes(2)))
+                        .withFileSystemBind(
+                                HOST_VOLUME_MOUNT_PATH,
+                                CONTAINER_VOLUME_MOUNT_PATH,
+                                
org.testcontainers.containers.BindMode.READ_WRITE);
+
+        copySeaTunnelStarterToContainer(jobManager);
+        copySeaTunnelStarterLoggingToContainer(jobManager);
+
+        
jobManager.setPortBindings(java.util.Arrays.asList(String.format("%s:%s", 8081, 
8081)));
+
+        taskManager =
+                new 
org.testcontainers.containers.GenericContainer<>(dockerImage)
+                        .withCommand("sh", "-c", 
createTaskManagerStartupCommand())
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases("taskmanager")
+                        .withEnv("FLINK_PROPERTIES", properties)
+                        .dependsOn(jobManager)
+                        .withLogConsumer(
+                                new 
org.testcontainers.containers.output.Slf4jLogConsumer(
+                                        
org.testcontainers.utility.DockerLoggerFactory.getLogger(
+                                                dockerImage + ":taskmanager")))
+                        .waitingFor(
+                                new org.testcontainers.containers.wait.strategy
+                                                .LogMessageWaitStrategy()
+                                        .withRegEx(
+                                                ".*Successful registration at 
resource manager.*")
+                                        
.withStartupTimeout(java.time.Duration.ofMinutes(2)))
+                        .withFileSystemBind(
+                                HOST_VOLUME_MOUNT_PATH,
+                                CONTAINER_VOLUME_MOUNT_PATH,
+                                
org.testcontainers.containers.BindMode.READ_WRITE);
+
+        
org.testcontainers.lifecycle.Startables.deepStart(java.util.stream.Stream.of(jobManager))
+                .join();
+
+        
org.testcontainers.lifecycle.Startables.deepStart(java.util.stream.Stream.of(taskManager))
+                .join();
+
+        // execute extra commands
+        executeExtraCommands(jobManager);
+
+        System.out.println("=== Flink20Container Startup Complete ===");
+    }
+
+    private String createJobManagerStartupCommand() {
+        // Create a complete startup command for JobManager that avoids shell 
operator issues
+        return createFlink20StartupScript()
+                + "\n"
+                + "echo 'Starting Flink JobManager...'\n"
+                + "exec /docker-entrypoint.sh jobmanager\n";
+    }
+
+    private String createTaskManagerStartupCommand() {
+        // Create a complete startup command for TaskManager that avoids shell 
operator issues
+        return createFlink20StartupScript()
+                + "\n"
+                + "echo 'Starting Flink TaskManager...'\n"
+                + "exec /docker-entrypoint.sh taskmanager\n";
+    }
+
+    private String createFlink20StartupScript() {
+        // Create a script that properly handles YAML configuration replacement
+        return "#!/bin/bash\n"
+                + "set -e\n"
+                + "echo 'SeaTunnel Flink 1.20.1 custom startup script'\n"
+                + "echo 'Handling YAML configuration for SnakeYAML 
compatibility'\n"
+                + "\n"
+                + "CONF_DIR=\"${FLINK_HOME}/conf\"\n"
+                + "CONF_FILE=\"${CONF_DIR}/flink-conf.yaml\"\n"
+                + "CONFIG_FILE=\"${CONF_DIR}/config.yaml\"\n"
+                + "\n"
+                + "echo 'Original configuration directory:'\n"
+                + "ls -la \"${CONF_DIR}\"\n"
+                + "\n"
+                + "if [ -n \"${FLINK_PROPERTIES}\" ]; then\n"
+                + "  if echo \"${FLINK_PROPERTIES}\" | grep -q 
'SEATUNNEL_FLINK20_CONFIG_REPLACE_START'; then\n"
+                + "    echo 'Replacing configuration files with YAML-compliant 
content'\n"
+                + "    \n"
+                + "    # Extract the actual config content (between markers)\n"
+                + "    # Use printf to handle special characters and quotes 
properly\n"
+                + "    printf '%s\\n' \"${FLINK_PROPERTIES}\" | sed -n 
'/SEATUNNEL_FLINK20_CONFIG_REPLACE_START/,/SEATUNNEL_FLINK20_CONFIG_REPLACE_END/p'
 | sed '1d;$d' > \"${CONF_FILE}\"\n"
+                + "    \n"
+                + "    # Copy to config.yaml as well\n"
+                + "    cp \"${CONF_FILE}\" \"${CONFIG_FILE}\"\n"
+                + "    \n"
+                + "    echo 'Configuration files replaced successfully'\n"
+                + "  else\n"
+                + "    echo 'Using standard append mode'\n"
+                + "    echo \"${FLINK_PROPERTIES}\" >> \"${CONF_FILE}\"\n"
+                + "    [ -f \"${CONFIG_FILE}\" ] && echo 
\"${FLINK_PROPERTIES}\" >> \"${CONFIG_FILE}\"\n"
+                + "  fi\n"
+                + "else\n"
+                + "  echo 'No FLINK_PROPERTIES provided'\n"
+                + "fi\n"
+                + "\n"
+                + "echo 'Final configuration files:'\n"
+                + "echo '=== flink-conf.yaml ==='\n"
+                + "cat \"${CONF_FILE}\" 2>/dev/null || echo 'flink-conf.yaml 
not found'\n"
+                + "echo '=== config.yaml ==='\n"
+                + "cat \"${CONFIG_FILE}\" 2>/dev/null || echo 'config.yaml not 
found'\n"
+                + "echo '=== End configuration files ==='\n";
+    }

Review Comment:
   This is because Flink 1.20 and later versions require strict adherence to 
the YAML 1.2 syntax, so this file has been added to be compatible with the 
Flink 20 container.



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.enums.MasterType;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/** The SeaTunnel flink starter, used to generate the final flink job execute 
command. */
+public class FlinkStarter implements Starter {

Review Comment:
   The content has been optimized and abstracted currently.



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java:
##########
@@ -99,7 +99,31 @@ public AbstractTestContainer() {
      */
     protected void executeExtraCommands(GenericContainer<?> container)
             throws IOException, InterruptedException {
-        // do nothing
+        // Set execute permissions for scripts to prevent "Permission denied" 
errors
+        setScriptExecutePermissions(container);
+    }
+
+    /** Set execute permissions for SeaTunnel scripts in the container. */
+    protected void setScriptExecutePermissions(GenericContainer<?> container) {

Review Comment:
   After adding the new container flink20, significant changes were made based 
on its features, which led to insufficient permissions. Therefore, this code 
was added.



##########
seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestEmbeddingIT.java:
##########
@@ -54,7 +54,7 @@ public class TestEmbeddingIT extends TestSuiteBase implements 
TestResource {
     @Override
     public void startUp() {
         Optional<URL> resource =
-                
Optional.ofNullable(TestLLMIT.class.getResource("/mock-embedding.json"));
+                
Optional.ofNullable(TestEmbeddingIT.class.getResource("/mock-embedding.json"));

Review Comment:
   Okay, I will go back to my original self.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_custom_sql.conf:
##########
@@ -26,6 +26,10 @@ source{
     row.num = 100
     split.num = 10
     string.length = 1
+    auto.increment.enabled = true
+    auto.increment.start = 2
+    bigint.min = 2
+    bigint.max = 1000

Review Comment:
   Currently, the starter module has been refactored, and all unnecessary 
modifications related to e2e (end-to-end) tests that were made previously have 
been reverted.



##########
seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestEmbeddingIT.java:
##########
@@ -54,7 +54,7 @@ public class TestEmbeddingIT extends TestSuiteBase implements 
TestResource {
     @Override
     public void startUp() {
         Optional<URL> resource =
-                
Optional.ofNullable(TestLLMIT.class.getResource("/mock-embedding.json"));
+                
Optional.ofNullable(TestEmbeddingIT.class.getResource("/mock-embedding.json"));

Review Comment:
   Does this need to be modified? Or should it remain as it is—Is it a special 
implementation? Should TestLLMIT be used in the TestEmbeddingIT file?



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.execution;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.flink.sink.FlinkSink;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.URL;
+import java.util.List;
+
+/** Sink execute processor for Flink 1.20. */
+@Slf4j

Review Comment:
   > I suggest that the implementation method of logs should be unified into one
   
   The logging style throughout the /seatunnel-core/seatunnel-flink-starter 
directory shall be unified to prioritize LoggerFactory, and LOGGER shall only 
be declared in classes that require log output.



##########
seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestEmbeddingIT.java:
##########
@@ -54,7 +54,7 @@ public class TestEmbeddingIT extends TestSuiteBase implements 
TestResource {
     @Override
     public void startUp() {
         Optional<URL> resource =
-                
Optional.ofNullable(TestLLMIT.class.getResource("/mock-embedding.json"));
+                
Optional.ofNullable(TestEmbeddingIT.class.getResource("/mock-embedding.json"));

Review Comment:
   The WriterInitContext in Flink 1.20 does not expose the complete 
RuntimeContext; it only provides metricGroup(), taskInfo(), and jobInfo().
   
   The common logic/metric adaptation layer of the existing SeaTunnel Sink 
still requires RuntimeContext in some scenarios, such as parallelism 
consistency verification, TaskName retrieval, access to user-defined 
accumulators, and compatibility with certain legacy logic.
   
   Therefore, the current reflection is for "compatibility supplementation and 
fallback degradation" rather than the primary approach.



##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java:
##########
@@ -0,0 +1,200 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.DefaultEventProcessor;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+@Slf4j
+public class FlinkSinkWriterContext implements SinkWriter.Context {
+
+    private final WriterInitContext initContext;
+    private final int parallelism;
+    private final EventListener eventListener;
+
+    public FlinkSinkWriterContext(WriterInitContext initContext, int 
parallelism) {
+        this.initContext = initContext;
+        this.parallelism = parallelism;
+        this.eventListener = new 
DefaultEventProcessor(getFlinkJobId(initContext));
+    }
+
+    @Override
+    public int getIndexOfSubtask() {
+        return initContext.getTaskInfo().getIndexOfThisSubtask();
+    }
+
+    @Override
+    public int getNumberOfParallelSubtasks() {
+        return parallelism;
+    }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        try {
+            RuntimeContext runtimeContext = getRuntimeContext();
+            MetricGroup metricGroup = initContext.metricGroup();
+
+            if (runtimeContext != null && metricGroup != null) {
+                return new FlinkMetricContext(runtimeContext, metricGroup);
+            } else {
+                return new FlinkMetricContext(metricGroup);
+            }
+        } catch (Exception e) {
+            return new FlinkMetricContext((MetricGroup) null);
+        }
+    }
+
+    @Override
+    public EventListener getEventListener() {
+        return eventListener;
+    }
+
+    public RuntimeContext getRuntimeContext() {
+        try {
+            RuntimeContext runtimeContext = tryGetFromFields(initContext);
+            if (runtimeContext != null) {
+                return runtimeContext;
+            }
+
+            runtimeContext = tryGetFromInitContextBase(initContext);
+            if (runtimeContext != null) {
+                return runtimeContext;
+            }
+
+            runtimeContext = tryGetFromWrapper(initContext);
+            if (runtimeContext != null) {
+                return runtimeContext;
+            }
+
+            return null;
+
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private RuntimeContext tryGetFromInitContextBase(Object context) {
+        try {
+            Class<?> initContextBaseClass =
+                    Class.forName(
+                            
"org.apache.flink.streaming.runtime.operators.sink.InitContextBase");
+            if (initContextBaseClass.isInstance(context)) {
+                Method getRuntimeContextMethod =
+                        
initContextBaseClass.getDeclaredMethod("getRuntimeContext");
+                getRuntimeContextMethod.setAccessible(true);
+                RuntimeContext runtimeContext =
+                        (RuntimeContext) 
getRuntimeContextMethod.invoke(context);
+                log.info(
+                        "Successfully obtained RuntimeContext from 
InitContextBase: {}",
+                        runtimeContext.getClass().getName());
+                return runtimeContext;
+            }
+        } catch (Exception e) {
+            log.debug("Failed to get RuntimeContext from InitContextBase", e);
+        }
+        return null;
+    }
+
+    private RuntimeContext tryGetFromWrapper(Object context) {
+        try {
+            String[] possibleFieldNames = {
+                "delegate", "wrapped", "context", "initContext", 
"writerInitContext"

Review Comment:
   Regarding the method tryGetFromWrapper, it can be removed.



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.execution;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.options.EnvCommonOptions;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
+import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
+import org.apache.seatunnel.core.starter.execution.TaskExecution;
+import org.apache.seatunnel.core.starter.flink.FlinkStarter;
+import org.apache.seatunnel.translation.flink.metric.FlinkJobMetricsSummary;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class FlinkExecution implements TaskExecution {

Review Comment:
   Because there is additional logic for the createFlink20JobMetricsSummary 
method in FlinkExecution



##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java:
##########
@@ -0,0 +1,200 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.DefaultEventProcessor;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+@Slf4j
+public class FlinkSinkWriterContext implements SinkWriter.Context {
+
+    private final WriterInitContext initContext;
+    private final int parallelism;
+    private final EventListener eventListener;
+
+    public FlinkSinkWriterContext(WriterInitContext initContext, int 
parallelism) {
+        this.initContext = initContext;
+        this.parallelism = parallelism;
+        this.eventListener = new 
DefaultEventProcessor(getFlinkJobId(initContext));
+    }
+
+    @Override
+    public int getIndexOfSubtask() {
+        return initContext.getTaskInfo().getIndexOfThisSubtask();
+    }
+
+    @Override
+    public int getNumberOfParallelSubtasks() {
+        return parallelism;
+    }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        try {
+            RuntimeContext runtimeContext = getRuntimeContext();
+            MetricGroup metricGroup = initContext.metricGroup();
+
+            if (runtimeContext != null && metricGroup != null) {
+                return new FlinkMetricContext(runtimeContext, metricGroup);
+            } else {
+                return new FlinkMetricContext(metricGroup);
+            }
+        } catch (Exception e) {
+            return new FlinkMetricContext((MetricGroup) null);
+        }
+    }
+
+    @Override
+    public EventListener getEventListener() {
+        return eventListener;
+    }
+
+    public RuntimeContext getRuntimeContext() {
+        try {
+            RuntimeContext runtimeContext = tryGetFromFields(initContext);
+            if (runtimeContext != null) {
+                return runtimeContext;
+            }
+
+            runtimeContext = tryGetFromInitContextBase(initContext);
+            if (runtimeContext != null) {
+                return runtimeContext;
+            }
+
+            runtimeContext = tryGetFromWrapper(initContext);
+            if (runtimeContext != null) {
+                return runtimeContext;
+            }
+
+            return null;
+
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private RuntimeContext tryGetFromInitContextBase(Object context) {
+        try {
+            Class<?> initContextBaseClass =
+                    Class.forName(
+                            
"org.apache.flink.streaming.runtime.operators.sink.InitContextBase");
+            if (initContextBaseClass.isInstance(context)) {
+                Method getRuntimeContextMethod =
+                        
initContextBaseClass.getDeclaredMethod("getRuntimeContext");
+                getRuntimeContextMethod.setAccessible(true);
+                RuntimeContext runtimeContext =
+                        (RuntimeContext) 
getRuntimeContextMethod.invoke(context);
+                log.info(
+                        "Successfully obtained RuntimeContext from 
InitContextBase: {}",
+                        runtimeContext.getClass().getName());
+                return runtimeContext;
+            }
+        } catch (Exception e) {
+            log.debug("Failed to get RuntimeContext from InitContextBase", e);
+        }
+        return null;
+    }
+
+    private RuntimeContext tryGetFromWrapper(Object context) {
+        try {
+            String[] possibleFieldNames = {
+                "delegate", "wrapped", "context", "initContext", 
"writerInitContext"

Review Comment:
   > I'm not quite clear about this. Could you explain it to me?
   
   The way Flink exposes RuntimeContext through WriterInitContext is 
inconsistent across versions. In some versions, RuntimeContext can be obtained 
directly from a field (e.g., a field of type StreamingRuntimeContext); in other 
versions, it is exposed via the getRuntimeContext method of the inner class 
InitContextBase; additionally, certain implementations wrap the actual init 
context (the field name may be delegate, wrapped, context, etc.). 
   
   To ensure compatibility with different versions and implementations, we 
perform safe probing in the following order:
   
   Scan the fields of the initContext itself. If a field is of type 
RuntimeContext or its subclass (e.g., StreamingRuntimeContext), return it 
directly.
   If Flink's internal InitContextBase exists, invoke its getRuntimeContext 
method via reflection.
   If the object is a wrapper, attempt to unwrap it by checking for known field 
names (delegate/wrapped/context/initContext/writerInitContext), then repeat 
step 2 for the unwrapped object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to