This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7393c4732 [Improve] [Engine] Improve Engine performance. (#3216)
7393c4732 is described below
commit 7393c47327f5a81de4aca4306ee975b588f94e36
Author: Hisoka <[email protected]>
AuthorDate: Thu Nov 3 18:01:10 2022 +0800
[Improve] [Engine] Improve Engine performance. (#3216)
* [Engine] [Test] improve engine performance
* [Engine] [Improve] engine performance improve
* [Core] [Improve] Add test timeout
* [Bug] [FakeSource] Add log for FakeSourceSplitEnumerator
* [Bug] [Engine] Fix TaskLocation Serializable problem
---
.../fake/source/FakeSourceSplitEnumerator.java | 2 +-
.../seatunnel/jdbc/internal/JdbcInputFormat.java | 15 ++++++++-------
.../internal/converter/AbstractJdbcRowConverter.java | 9 ++++-----
.../seatunnel/jdbc/internal/dialect/JdbcDialect.java | 18 ++++++++++++++++++
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 12 ++++++++++++
.../connectors/seatunnel/jdbc/source/JdbcSource.java | 4 ++--
.../seatunnel/engine/e2e/ClusterFaultToleranceIT.java | 2 +-
.../common/loader/SeatunnelChildFirstClassLoader.java | 4 ++++
.../seatunnel/engine/core/parse/JobConfigParser.java | 1 +
.../seatunnel/engine/server/TaskExecutionService.java | 10 ++++++----
.../engine/server/checkpoint/CheckpointManager.java | 3 ---
.../operation/CheckpointFinishedOperation.java | 4 ++--
.../engine/server/execution/TaskLocation.java | 3 +++
.../server/task/SinkAggregatedCommitterTask.java | 3 +++
.../engine/server/task/SourceSplitEnumeratorTask.java | 4 ++++
.../task/flow/IntermediateQueueFlowLifeCycle.java | 4 +++-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 2 +-
.../task/operation/GetTaskGroupAddressOperation.java | 3 ++-
.../seatunnel/format/text/TextSerializationSchema.java | 12 ++++++------
19 files changed, 81 insertions(+), 34 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
index 38b4f6b8e..04144834b 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -139,7 +139,7 @@ public class FakeSourceSplitEnumerator implements
SourceSplitEnumerator<FakeSour
// Mark pending splits as already assigned
assignedSplits.addAll(pendingAssignmentForReader);
// Assign pending splits to reader
- LOG.info("Assigning splits to readers {}",
pendingAssignmentForReader);
+ LOG.info("Assigning splits to readers {} {}", pendingReader,
pendingAssignmentForReader);
enumeratorContext.assignSplit(pendingReader, new
ArrayList<>(pendingAssignmentForReader));
enumeratorContext.signalNoMoreSplits(pendingReader);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
index 1baf17256..f9fe6fdfb 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
import org.slf4j.Logger;
@@ -47,7 +48,7 @@ import java.sql.Timestamp;
public class JdbcInputFormat implements Serializable {
- protected static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 2L;
protected static final Logger LOG =
LoggerFactory.getLogger(JdbcInputFormat.class);
protected JdbcConnectionProvider connectionProvider;
@@ -63,19 +64,22 @@ public class JdbcInputFormat implements Serializable {
protected boolean hasNext;
+ protected JdbcDialect jdbcDialect;
+
public JdbcInputFormat(JdbcConnectionProvider connectionProvider,
- JdbcRowConverter jdbcRowConverter,
+ JdbcDialect jdbcDialect,
SeaTunnelRowType typeInfo,
String queryTemplate,
int fetchSize,
Boolean autoCommit
) {
this.connectionProvider = connectionProvider;
- this.jdbcRowConverter = jdbcRowConverter;
+ this.jdbcRowConverter = jdbcDialect.getRowConverter();
this.typeInfo = typeInfo;
this.queryTemplate = queryTemplate;
this.fetchSize = fetchSize;
this.autoCommit = autoCommit;
+ this.jdbcDialect = jdbcDialect;
}
public void openInputFormat() {
@@ -89,10 +93,7 @@ public class JdbcInputFormat implements Serializable {
dbConn.setAutoCommit(autoCommit);
}
- statement = dbConn.prepareStatement(queryTemplate);
- if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
- statement.setFetchSize(fetchSize);
- }
+ statement = jdbcDialect.creatPreparedStatement(dbConn,
queryTemplate, fetchSize);
} catch (SQLException se) {
throw new IllegalArgumentException("open() failed." +
se.getMessage(), se);
} catch (ClassNotFoundException cnfe) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index c1091293c..5f5c557f6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -53,10 +53,7 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
Object seatunnelField;
SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
- if (null == rs.getObject(i)) {
- seatunnelField = null;
- }
- else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+ if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getBoolean(i);
} else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getByte(i);
@@ -88,7 +85,9 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
} else {
throw new IllegalStateException("Unexpected value: " +
seaTunnelDataType);
}
-
+ if (rs.wasNull()) {
+ seatunnelField = null;
+ }
fields.add(seatunnelField);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 155d8d4c2..12fc2cd8c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -20,6 +20,10 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
/**
* Represents a dialect of SQL implemented by a particular JDBC system.
Dialects should be immutable
@@ -45,8 +49,22 @@ public interface JdbcDialect extends Serializable {
/**
* get jdbc meta-information type to seatunnel data type mapper.
+ *
* @return a type mapper for the database
*/
JdbcDialectTypeMapper getJdbcDialectTypeMapper();
+ /**
+ * Different dialects optimize their PreparedStatement
+ *
+ * @return The logic about optimize PreparedStatement
+ */
+ default PreparedStatement creatPreparedStatement(Connection connection,
String queryTemplate, int fetchSize) throws SQLException {
+ PreparedStatement statement =
connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
+ statement.setFetchSize(fetchSize);
+ }
+ return statement;
+ }
+
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 3e4d77158..7d38fc325 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -21,6 +21,11 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
public class MysqlDialect implements JdbcDialect {
@Override
public String dialectName() {
@@ -36,4 +41,11 @@ public class MysqlDialect implements JdbcDialect {
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new MySqlTypeMapper();
}
+
+ @Override
+ public PreparedStatement creatPreparedStatement(Connection connection,
String queryTemplate, int fetchSize) throws SQLException {
+ PreparedStatement statement =
connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ statement.setFetchSize(Integer.MIN_VALUE);
+ return statement;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 196b4dcac..976598099 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -86,7 +86,7 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
inputFormat = new JdbcInputFormat(
jdbcConnectionProvider,
- jdbcDialect.getRowConverter(),
+ jdbcDialect,
typeInfo,
query,
0,
@@ -147,7 +147,7 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
} catch (Exception e) {
LOG.warn("get row type info exception", e);
}
- return new SeaTunnelRowType(fieldNames.toArray(new
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+ return new SeaTunnelRowType(fieldNames.toArray(new String[0]),
seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
}
private PartitionParameter initPartitionParameter(String columnName,
Connection connection) throws SQLException {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index a09864e54..5cdb22770 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -215,7 +215,7 @@ public class ClusterFaultToleranceIT {
return clientJobProxy.waitForJobComplete();
});
- Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(3, TimeUnit.MINUTES)
.untilAsserted(() -> {
Thread.sleep(2000);
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java
index 1dc0cceb7..22eac3687 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java
@@ -29,6 +29,10 @@ public class SeatunnelChildFirstClassLoader extends
SeatunnelBaseClassLoader {
private final String[] alwaysParentFirstPatterns;
private static final String[] DEFAULT_PARENT_FIRST_PATTERNS = new String[]{
"java.",
+ "javax.xml",
+ "org.xml",
+ "org.w3c",
+ "org.apache.hadoop",
"scala.",
"org.apache.seatunnel.",
"javax.annotation.",
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 847399b5d..6859171e8 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -123,6 +123,7 @@ public class JobConfigParser {
complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
}
actions.forEach(this::addCommonPluginJarsToAction);
+ jarUrlsSet.addAll(commonPluginJars);
return new ImmutablePair<>(actions, jarUrlsSet);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 3b44634b9..cf0867160 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -177,6 +177,7 @@ public class TaskExecutionService {
return new PassiveCompletableFuture<>(resultFuture);
}
+ @Deprecated
public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
@NonNull TaskGroup taskGroup,
@NonNull CompletableFuture<TaskExecutionState> resultFuture) {
@@ -272,7 +273,8 @@ public class TaskExecutionService {
@Override
public void run() {
- ClassLoader classLoader =
executionContexts.get(tracker.taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
+ TaskExecutionService.TaskGroupExecutionTracker
taskGroupExecutionTracker = tracker.taskGroupExecutionTracker;
+ ClassLoader classLoader =
executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
final Task t = tracker.task;
@@ -283,12 +285,12 @@ public class TaskExecutionService {
do {
result = t.call();
} while (!result.isDone() && isRunning &&
-
!tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
+
!taskGroupExecutionTracker.executionCompletedExceptionally());
} catch (Throwable e) {
logger.warning("Exception in " + t, e);
- tracker.taskGroupExecutionTracker.exception(e);
+ taskGroupExecutionTracker.exception(e);
} finally {
- tracker.taskGroupExecutionTracker.taskDone();
+ taskGroupExecutionTracker.taskDone();
}
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index e7ff7ae20..78829e4dc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -41,7 +41,6 @@ import
org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
-import com.hazelcast.cluster.Address;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -218,8 +217,6 @@ public class CheckpointManager {
}
protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation
operation) {
- Address address =
-
jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId());
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index 422190a7d..ff5c92300 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -76,9 +76,9 @@ public class CheckpointFinishedOperation extends
TaskOperation {
public void run() throws Exception {
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
- Task task =
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
- .getTaskGroup().getTask(taskLocation.getTaskID());
try {
+ Task task =
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
+ .getTaskGroup().getTask(taskLocation.getTaskID());
if (successful) {
task.notifyCheckpointComplete(checkpointId);
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index 64737254c..03d68f955 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -95,12 +95,14 @@ public class TaskLocation implements
IdentifiedDataSerializable, Serializable {
public void writeData(ObjectDataOutput out) throws IOException {
out.writeObject(taskGroupLocation);
out.writeLong(taskID);
+ out.writeInt(index);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
taskGroupLocation = in.readObject();
taskID = in.readLong();
+ index = in.readInt();
}
@Override
@@ -108,6 +110,7 @@ public class TaskLocation implements
IdentifiedDataSerializable, Serializable {
return "TaskLocation{" +
"taskGroupLocation=" + taskGroupLocation +
", taskID=" + taskID +
+ ", index=" + index +
'}';
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 750a79690..6fa251b99 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -117,6 +117,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT> ex
return progress.toState();
}
+ @SuppressWarnings("checkstyle:MagicNumber")
protected void stateProcess() throws Exception {
switch (currState) {
case INIT:
@@ -142,6 +143,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT> ex
case RUNNING:
if (prepareCloseStatus) {
currState = PREPARE_CLOSE;
+ } else {
+ Thread.sleep(100);
}
break;
case PREPARE_CLOSE:
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index be5927f10..2b2d2befb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -207,6 +207,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
}
}
+ @SuppressWarnings("checkstyle:MagicNumber")
private void stateProcess() throws Exception {
switch (currState) {
case INIT:
@@ -233,8 +234,11 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
case RUNNING:
// The reader closes automatically after reading
if (prepareCloseStatus) {
+ // TODO we should trigger this after CheckpointCoordinator
done
triggerBarrier(Barrier.completedBarrier());
currState = PREPARE_CLOSE;
+ } else {
+ Thread.sleep(100);
}
break;
case PREPARE_CLOSE:
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
index bd885916b..ae9b55e90 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.engine.server.task.record.Barrier;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
public class IntermediateQueueFlowLifeCycle extends AbstractFlowLifeCycle
implements OneInputFlowLifeCycle<Record<?>>,
OneOutputFlowLifeCycle<Record<?>> {
@@ -48,10 +49,11 @@ public class IntermediateQueueFlowLifeCycle extends
AbstractFlowLifeCycle implem
}
}
+ @SuppressWarnings("checkstyle:MagicNumber")
@Override
public void collect(Collector<Record<?>> collector) throws Exception {
while (true) {
- Record<?> record = queue.poll();
+ Record<?> record = queue.poll(100, TimeUnit.MILLISECONDS);
if (record != null) {
handleRecord(record, collector::collect);
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index eeba5588b..3a548105d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -130,10 +130,10 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
writer.abortPrepare();
throw e;
}
+ List<StateT> states =
writer.snapshotState(barrier.getId());
if (!writerStateSerializer.isPresent()) {
runningTask.addState(barrier, sinkAction.getId(),
Collections.emptyList());
} else {
- List<StateT> states =
writer.snapshotState(barrier.getId());
runningTask.addState(barrier, sinkAction.getId(),
serializeStates(writerStateSerializer.get(), states));
}
if (containAggCommitter) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 7f69773b1..088fabd0b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -30,6 +30,7 @@ import
com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
+import java.util.Objects;
public class GetTaskGroupAddressOperation extends Operation implements
IdentifiedDataSerializable {
@@ -50,7 +51,7 @@ public class GetTaskGroupAddressOperation extends Operation
implements Identifie
response = RetryUtils.retryWithException(() ->
server.getCoordinatorService().getJobMaster(taskLocation.getJobId())
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof Exception,
Constant.OPERATION_RETRY_SLEEP));
+ Objects::nonNull, Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index d23d5c2cc..b55f5e6af 100644
---
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -64,17 +64,14 @@ public class TextSerializationSchema implements
SerializationSchema {
return "";
}
switch (fieldType.getSqlType()) {
- case ARRAY:
- case MAP:
- return JsonUtils.toJsonString(field);
+ case DOUBLE:
+ case FLOAT:
+ case INT:
case STRING:
case BOOLEAN:
case TINYINT:
case SMALLINT:
- case INT:
case BIGINT:
- case FLOAT:
- case DOUBLE:
case DECIMAL:
return field.toString();
case DATE:
@@ -87,6 +84,9 @@ public class TextSerializationSchema implements
SerializationSchema {
return "";
case BYTES:
return new String((byte[]) field);
+ case ARRAY:
+ case MAP:
+ return JsonUtils.toJsonString(field);
case ROW:
Object[] fields = ((SeaTunnelRow) field).getFields();
String[] strings = new String[fields.length];