This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7393c4732 [Improve] [Engine] Improve Engine performance. (#3216)
7393c4732 is described below

commit 7393c47327f5a81de4aca4306ee975b588f94e36
Author: Hisoka <[email protected]>
AuthorDate: Thu Nov 3 18:01:10 2022 +0800

    [Improve] [Engine] Improve Engine performance. (#3216)
    
    * [Engine] [Test] improve engine performance
    
    * [Engine] [Improve] engine performance improve
    
    * [Core] [Improve] Add test timeout
    
    * [Bug] [FakeSource] Add log for FakeSourceSplitEnumerator
    
    * [Bug] [Engine] Fix TaskLocation Serializable problem
---
 .../fake/source/FakeSourceSplitEnumerator.java         |  2 +-
 .../seatunnel/jdbc/internal/JdbcInputFormat.java       | 15 ++++++++-------
 .../internal/converter/AbstractJdbcRowConverter.java   |  9 ++++-----
 .../seatunnel/jdbc/internal/dialect/JdbcDialect.java   | 18 ++++++++++++++++++
 .../jdbc/internal/dialect/mysql/MysqlDialect.java      | 12 ++++++++++++
 .../connectors/seatunnel/jdbc/source/JdbcSource.java   |  4 ++--
 .../seatunnel/engine/e2e/ClusterFaultToleranceIT.java  |  2 +-
 .../common/loader/SeatunnelChildFirstClassLoader.java  |  4 ++++
 .../seatunnel/engine/core/parse/JobConfigParser.java   |  1 +
 .../seatunnel/engine/server/TaskExecutionService.java  | 10 ++++++----
 .../engine/server/checkpoint/CheckpointManager.java    |  3 ---
 .../operation/CheckpointFinishedOperation.java         |  4 ++--
 .../engine/server/execution/TaskLocation.java          |  3 +++
 .../server/task/SinkAggregatedCommitterTask.java       |  3 +++
 .../engine/server/task/SourceSplitEnumeratorTask.java  |  4 ++++
 .../task/flow/IntermediateQueueFlowLifeCycle.java      |  4 +++-
 .../engine/server/task/flow/SinkFlowLifeCycle.java     |  2 +-
 .../task/operation/GetTaskGroupAddressOperation.java   |  3 ++-
 .../seatunnel/format/text/TextSerializationSchema.java | 12 ++++++------
 19 files changed, 81 insertions(+), 34 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
index 38b4f6b8e..04144834b 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -139,7 +139,7 @@ public class FakeSourceSplitEnumerator implements 
SourceSplitEnumerator<FakeSour
                 // Mark pending splits as already assigned
                 assignedSplits.addAll(pendingAssignmentForReader);
                 // Assign pending splits to reader
-                LOG.info("Assigning splits to readers {}", 
pendingAssignmentForReader);
+                LOG.info("Assigning splits to readers {} {}", pendingReader, 
pendingAssignmentForReader);
                 enumeratorContext.assignSplit(pendingReader, new 
ArrayList<>(pendingAssignmentForReader));
                 enumeratorContext.signalNoMoreSplits(pendingReader);
             }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
index 1baf17256..f9fe6fdfb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
 
 import org.slf4j.Logger;
@@ -47,7 +48,7 @@ import java.sql.Timestamp;
 
 public class JdbcInputFormat implements Serializable {
 
-    protected static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 2L;
     protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcInputFormat.class);
 
     protected JdbcConnectionProvider connectionProvider;
@@ -63,19 +64,22 @@ public class JdbcInputFormat implements Serializable {
 
     protected boolean hasNext;
 
+    protected JdbcDialect jdbcDialect;
+
     public JdbcInputFormat(JdbcConnectionProvider connectionProvider,
-                           JdbcRowConverter jdbcRowConverter,
+                           JdbcDialect jdbcDialect,
                            SeaTunnelRowType typeInfo,
                            String queryTemplate,
                            int fetchSize,
                            Boolean autoCommit
     ) {
         this.connectionProvider = connectionProvider;
-        this.jdbcRowConverter = jdbcRowConverter;
+        this.jdbcRowConverter = jdbcDialect.getRowConverter();
         this.typeInfo = typeInfo;
         this.queryTemplate = queryTemplate;
         this.fetchSize = fetchSize;
         this.autoCommit = autoCommit;
+        this.jdbcDialect = jdbcDialect;
     }
 
     public void openInputFormat() {
@@ -89,10 +93,7 @@ public class JdbcInputFormat implements Serializable {
                 dbConn.setAutoCommit(autoCommit);
             }
 
-            statement = dbConn.prepareStatement(queryTemplate);
-            if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
-                statement.setFetchSize(fetchSize);
-            }
+            statement = jdbcDialect.creatPreparedStatement(dbConn, 
queryTemplate, fetchSize);
         } catch (SQLException se) {
             throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
         } catch (ClassNotFoundException cnfe) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index c1091293c..5f5c557f6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -53,10 +53,7 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
         for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
             Object seatunnelField;
             SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
-            if (null == rs.getObject(i)) {
-                seatunnelField = null;
-            }
-            else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+            if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getBoolean(i);
             } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getByte(i);
@@ -88,7 +85,9 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
             } else {
                 throw new IllegalStateException("Unexpected value: " + 
seaTunnelDataType);
             }
-
+            if (rs.wasNull()) {
+                seatunnelField = null;
+            }
             fields.add(seatunnelField);
         }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 155d8d4c2..12fc2cd8c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -20,6 +20,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 
 import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 
 /**
  * Represents a dialect of SQL implemented by a particular JDBC system. 
Dialects should be immutable
@@ -45,8 +49,22 @@ public interface JdbcDialect extends Serializable {
 
     /**
      * get jdbc meta-information type to seatunnel data type mapper.
+     *
      * @return a type mapper for the database
      */
     JdbcDialectTypeMapper getJdbcDialectTypeMapper();
 
+    /**
+     * Different dialects optimize their PreparedStatement
+     *
+     * @return The logic about optimize PreparedStatement
+     */
+    default PreparedStatement creatPreparedStatement(Connection connection, 
String queryTemplate, int fetchSize) throws SQLException {
+        PreparedStatement statement = 
connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+        if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
+            statement.setFetchSize(fetchSize);
+        }
+        return statement;
+    }
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 3e4d77158..7d38fc325 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -21,6 +21,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
 public class MysqlDialect implements JdbcDialect {
     @Override
     public String dialectName() {
@@ -36,4 +41,11 @@ public class MysqlDialect implements JdbcDialect {
     public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
         return new MySqlTypeMapper();
     }
+
+    @Override
+    public PreparedStatement creatPreparedStatement(Connection connection, 
String queryTemplate, int fetchSize) throws SQLException {
+        PreparedStatement statement = 
connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+        statement.setFetchSize(Integer.MIN_VALUE);
+        return statement;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 196b4dcac..976598099 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -86,7 +86,7 @@ public class JdbcSource implements 
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
 
         inputFormat = new JdbcInputFormat(
             jdbcConnectionProvider,
-            jdbcDialect.getRowConverter(),
+            jdbcDialect,
             typeInfo,
             query,
             0,
@@ -147,7 +147,7 @@ public class JdbcSource implements 
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
         } catch (Exception e) {
             LOG.warn("get row type info exception", e);
         }
-        return new SeaTunnelRowType(fieldNames.toArray(new 
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new 
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+        return new SeaTunnelRowType(fieldNames.toArray(new String[0]), 
seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
     }
 
     private PartitionParameter initPartitionParameter(String columnName, 
Connection connection) throws SQLException {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index a09864e54..5cdb22770 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -215,7 +215,7 @@ public class ClusterFaultToleranceIT {
                 return clientJobProxy.waitForJobComplete();
             });
 
-            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+            Awaitility.await().atMost(3, TimeUnit.MINUTES)
                 .untilAsserted(() -> {
                     Thread.sleep(2000);
                     
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java
index 1dc0cceb7..22eac3687 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java
@@ -29,6 +29,10 @@ public class SeatunnelChildFirstClassLoader extends 
SeatunnelBaseClassLoader {
     private final String[] alwaysParentFirstPatterns;
     private static final String[] DEFAULT_PARENT_FIRST_PATTERNS = new String[]{
         "java.",
+        "javax.xml",
+        "org.xml",
+        "org.w3c",
+        "org.apache.hadoop",
         "scala.",
         "org.apache.seatunnel.",
         "javax.annotation.",
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 847399b5d..6859171e8 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -123,6 +123,7 @@ public class JobConfigParser {
             complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
         }
         actions.forEach(this::addCommonPluginJarsToAction);
+        jarUrlsSet.addAll(commonPluginJars);
         return new ImmutablePair<>(actions, jarUrlsSet);
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 3b44634b9..cf0867160 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -177,6 +177,7 @@ public class TaskExecutionService {
         return new PassiveCompletableFuture<>(resultFuture);
     }
 
+    @Deprecated
     public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
         @NonNull TaskGroup taskGroup,
         @NonNull CompletableFuture<TaskExecutionState> resultFuture) {
@@ -272,7 +273,8 @@ public class TaskExecutionService {
 
         @Override
         public void run() {
-            ClassLoader classLoader = 
executionContexts.get(tracker.taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
+            TaskExecutionService.TaskGroupExecutionTracker 
taskGroupExecutionTracker = tracker.taskGroupExecutionTracker;
+            ClassLoader classLoader = 
executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
             ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
             Thread.currentThread().setContextClassLoader(classLoader);
             final Task t = tracker.task;
@@ -283,12 +285,12 @@ public class TaskExecutionService {
                 do {
                     result = t.call();
                 } while (!result.isDone() && isRunning &&
-                    
!tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
+                    
!taskGroupExecutionTracker.executionCompletedExceptionally());
             } catch (Throwable e) {
                 logger.warning("Exception in " + t, e);
-                tracker.taskGroupExecutionTracker.exception(e);
+                taskGroupExecutionTracker.exception(e);
             } finally {
-                tracker.taskGroupExecutionTracker.taskDone();
+                taskGroupExecutionTracker.taskDone();
             }
             Thread.currentThread().setContextClassLoader(oldClassLoader);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index e7ff7ae20..78829e4dc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -41,7 +41,6 @@ import 
org.apache.seatunnel.engine.server.task.operation.TaskOperation;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
-import com.hazelcast.cluster.Address;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -218,8 +217,6 @@ public class CheckpointManager {
     }
 
     protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation 
operation) {
-        Address address =
-            
jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId());
         return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
             
jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index 422190a7d..ff5c92300 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -76,9 +76,9 @@ public class CheckpointFinishedOperation extends 
TaskOperation {
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
-            Task task = 
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
-                .getTaskGroup().getTask(taskLocation.getTaskID());
             try {
+                Task task = 
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
+                    .getTaskGroup().getTask(taskLocation.getTaskID());
                 if (successful) {
                     task.notifyCheckpointComplete(checkpointId);
                 } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index 64737254c..03d68f955 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -95,12 +95,14 @@ public class TaskLocation implements 
IdentifiedDataSerializable, Serializable {
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeObject(taskGroupLocation);
         out.writeLong(taskID);
+        out.writeInt(index);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
         taskGroupLocation = in.readObject();
         taskID = in.readLong();
+        index = in.readInt();
     }
 
     @Override
@@ -108,6 +110,7 @@ public class TaskLocation implements 
IdentifiedDataSerializable, Serializable {
         return "TaskLocation{" +
             "taskGroupLocation=" + taskGroupLocation +
             ", taskID=" + taskID +
+            ", index=" + index +
             '}';
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 750a79690..6fa251b99 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -117,6 +117,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT> ex
         return progress.toState();
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     protected void stateProcess() throws Exception {
         switch (currState) {
             case INIT:
@@ -142,6 +143,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT> ex
             case RUNNING:
                 if (prepareCloseStatus) {
                     currState = PREPARE_CLOSE;
+                } else {
+                    Thread.sleep(100);
                 }
                 break;
             case PREPARE_CLOSE:
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index be5927f10..2b2d2befb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -207,6 +207,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         }
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     private void stateProcess() throws Exception {
         switch (currState) {
             case INIT:
@@ -233,8 +234,11 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
             case RUNNING:
                 // The reader closes automatically after reading
                 if (prepareCloseStatus) {
+                    // TODO we should trigger this after CheckpointCoordinator 
done
                     triggerBarrier(Barrier.completedBarrier());
                     currState = PREPARE_CLOSE;
+                } else {
+                    Thread.sleep(100);
                 }
                 break;
             case PREPARE_CLOSE:
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
index bd885916b..ae9b55e90 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 public class IntermediateQueueFlowLifeCycle extends AbstractFlowLifeCycle 
implements OneInputFlowLifeCycle<Record<?>>,
         OneOutputFlowLifeCycle<Record<?>> {
@@ -48,10 +49,11 @@ public class IntermediateQueueFlowLifeCycle extends 
AbstractFlowLifeCycle implem
         }
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public void collect(Collector<Record<?>> collector) throws Exception {
         while (true) {
-            Record<?> record = queue.poll();
+            Record<?> record = queue.poll(100, TimeUnit.MILLISECONDS);
             if (record != null) {
                 handleRecord(record, collector::collect);
             } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index eeba5588b..3a548105d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -130,10 +130,10 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                         writer.abortPrepare();
                         throw e;
                     }
+                    List<StateT> states = 
writer.snapshotState(barrier.getId());
                     if (!writerStateSerializer.isPresent()) {
                         runningTask.addState(barrier, sinkAction.getId(), 
Collections.emptyList());
                     } else {
-                        List<StateT> states = 
writer.snapshotState(barrier.getId());
                         runningTask.addState(barrier, sinkAction.getId(), 
serializeStates(writerStateSerializer.get(), states));
                     }
                     if (containAggCommitter) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 7f69773b1..088fabd0b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -30,6 +30,7 @@ import 
com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
+import java.util.Objects;
 
 public class GetTaskGroupAddressOperation extends Operation implements 
IdentifiedDataSerializable {
 
@@ -50,7 +51,7 @@ public class GetTaskGroupAddressOperation extends Operation 
implements Identifie
         response = RetryUtils.retryWithException(() -> 
server.getCoordinatorService().getJobMaster(taskLocation.getJobId())
                 
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
             new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-                exception -> exception instanceof Exception, 
Constant.OPERATION_RETRY_SLEEP));
+                Objects::nonNull, Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index d23d5c2cc..b55f5e6af 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -64,17 +64,14 @@ public class TextSerializationSchema implements 
SerializationSchema {
             return "";
         }
         switch (fieldType.getSqlType()) {
-            case ARRAY:
-            case MAP:
-                return JsonUtils.toJsonString(field);
+            case DOUBLE:
+            case FLOAT:
+            case INT:
             case STRING:
             case BOOLEAN:
             case TINYINT:
             case SMALLINT:
-            case INT:
             case BIGINT:
-            case FLOAT:
-            case DOUBLE:
             case DECIMAL:
                 return field.toString();
             case DATE:
@@ -87,6 +84,9 @@ public class TextSerializationSchema implements 
SerializationSchema {
                 return "";
             case BYTES:
                 return new String((byte[]) field);
+            case ARRAY:
+            case MAP:
+                return JsonUtils.toJsonString(field);
             case ROW:
                 Object[] fields = ((SeaTunnelRow) field).getFields();
                 String[] strings = new String[fields.length];

Reply via email to