This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 65f5a1653 [HotFix][Zeta] Fix NullPointerException when registering
Reader with the enumerator (#4048)
65f5a1653 is described below
commit 65f5a1653bb7d559f76ca85cbfdc6dd485c95e45
Author: ic4y <[email protected]>
AuthorDate: Wed Feb 8 11:02:35 2023 +0800
[HotFix][Zeta] Fix NullPointerException when registering Reader with the
enumerator (#4048)
* [HotFix][Zeta] Fix NullPointerException when registering Reader with the
enumerator
---
.../apache/seatunnel/engine/common/Constant.java | 2 +-
.../engine/server/TaskExecutionService.java | 15 ++++---
.../operation/NotifyTaskStartOperation.java | 8 ++--
.../operation/TaskReportStatusOperation.java | 20 ++++++---
.../seatunnel/engine/server/task/AbstractTask.java | 5 ++-
.../engine/server/task/SeaTunnelTask.java | 4 +-
.../server/task/SinkAggregatedCommitterTask.java | 4 +-
.../server/task/SourceSplitEnumeratorTask.java | 51 ++++++++++++++--------
.../checkpoint/CloseRequestOperation.java | 3 +-
.../operation/source/AssignSplitOperation.java | 3 +-
.../operation/source/RequestSplitOperation.java | 3 +-
.../operation/source/RestoredSplitOperation.java | 3 +-
.../source/SourceNoMoreElementOperation.java | 3 +-
.../source/SourceReaderEventOperation.java | 3 +-
.../operation/source/SourceRegisterOperation.java | 5 ++-
15 files changed, 86 insertions(+), 46 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 0fb820cba..d413938c2 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -36,7 +36,7 @@ public class Constant {
public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML =
"seatunnel.yaml";
- public static final int OPERATION_RETRY_TIME = 10;
+ public static final int OPERATION_RETRY_TIME = 30;
public static final int OPERATION_RETRY_SLEEP = 2000;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 688a38c69..195415c4b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -32,6 +32,7 @@ import static java.util.stream.Collectors.toList;
import org.apache.seatunnel.api.common.metrics.MetricTags;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
@@ -62,7 +63,6 @@ import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
-import com.hazelcast.spi.exception.WrongTargetException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import com.hazelcast.spi.properties.HazelcastProperties;
@@ -188,9 +188,14 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
return deployTask(taskImmutableInfo);
}
- public <T extends Task> T getTask(TaskLocation taskLocation) {
- return
this.getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup()
- .getTask(taskLocation.getTaskID());
+ public <T extends Task> T getTask(@NonNull TaskLocation taskLocation) {
+ TaskGroupContext executionContext =
this.getExecutionContext(taskLocation.getTaskGroupLocation());
+ if (null == executionContext) {
+ throw new SeaTunnelException(
+ String.format("Failed to get Task, TaskLocation{%s} does not
exist in TaskExecutionServer",
+ taskLocation));
+ }
+ return
executionContext.getTaskGroup().getTask(taskLocation.getTaskID());
}
public PassiveCompletableFuture<TaskExecutionState> deployTask(
@@ -370,7 +375,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
});
});
- } catch (WrongTargetException e){
+ } catch (Exception e){
logger.warning("The Imap acquisition failed due to the hazelcast
node being offline or restarted, and will be retried next time", e);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
index ace26fb92..6106de015 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.checkpoint.operation;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -48,12 +49,13 @@ public class NotifyTaskStartOperation extends TaskOperation
{
public void run() throws Exception {
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
- AbstractTask task =
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
- .getTaskGroup().getTask(taskLocation.getTaskID());
+ AbstractTask task = server
+ .getTaskExecutionService()
+ .getTask(taskLocation);
task.startCall();
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException &&
+ exception -> exception instanceof SeaTunnelException &&
!server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
index 171353e6b..a00689810 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.checkpoint.operation;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import
org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook;
@@ -37,6 +39,10 @@ import java.io.IOException;
@AllArgsConstructor
public class TaskReportStatusOperation extends Operation implements
IdentifiedDataSerializable {
+ private static final int RETRY_NUMBER = 20;
+
+ private static final int RETRY_INTERVAL = 2000;
+
private TaskLocation location;
private SeaTunnelTaskState status;
@@ -63,10 +69,14 @@ public class TaskReportStatusOperation extends Operation
implements IdentifiedDa
}
@Override
- public void run() {
- ((SeaTunnelServer) getService())
- .getCoordinatorService().getJobMaster(location.getJobId())
- .getCheckpointManager()
- .reportedTask(this);
+ public void run() throws Exception {
+ CoordinatorService coordinatorService = ((SeaTunnelServer)
getService())
+ .getCoordinatorService();
+ RetryUtils.retryWithException(() -> {
+ coordinatorService.getJobMaster(location.getJobId())
+ .getCheckpointManager()
+ .reportedTask(this);
+ return null;
+ }, new RetryUtils.RetryMaterial(RETRY_NUMBER, true, e -> true,
RETRY_INTERVAL));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 3daf0512b..643009b0c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -32,6 +32,7 @@ import lombok.NonNull;
import java.net.URL;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -41,7 +42,7 @@ public abstract class AbstractTask implements Task {
protected TaskExecutionContext executionContext;
protected final long jobID;
protected final TaskLocation taskLocation;
- protected volatile boolean restoreComplete;
+ protected volatile CompletableFuture<Void> restoreComplete;
protected volatile boolean startCalled;
protected volatile boolean closeCalled;
protected volatile boolean prepareCloseStatus;
@@ -54,7 +55,6 @@ public abstract class AbstractTask implements Task {
this.taskLocation = taskLocation;
this.jobID = jobID;
this.progress = new Progress();
- this.restoreComplete = false;
this.startCalled = false;
this.closeCalled = false;
this.prepareCloseStatus = false;
@@ -74,6 +74,7 @@ public abstract class AbstractTask implements Task {
@Override
public void init() throws Exception {
+ this.restoreComplete = new CompletableFuture<>();
progress.start();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 5c434acbb..9d9a8bd17 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -137,7 +137,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
reportTaskStatus(WAITING_RESTORE);
break;
case WAITING_RESTORE:
- if (restoreComplete) {
+ if (restoreComplete.isDone()) {
for (FlowLifeCycle cycle : allCycles) {
cycle.open();
}
@@ -325,7 +325,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
sneakyThrow(e);
}
});
- restoreComplete = true;
+ restoreComplete.complete(null);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index dc42be5fd..fcd4a431d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -125,7 +125,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT> ex
reportTaskStatus(WAITING_RESTORE);
break;
case WAITING_RESTORE:
- if (restoreComplete) {
+ if (restoreComplete.isDone()) {
currState = READY_START;
reportTaskStatus(READY_START);
}
@@ -203,7 +203,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT> ex
.map(bytes -> sneaky(() ->
aggregatedCommitInfoSerializer.deserialize(bytes)))
.collect(Collectors.toList());
aggregatedCommitter.commit(aggregatedCommitInfos);
- restoreComplete = true;
+ restoreComplete.complete(null);
}
public void receivedWriterCommitInfo(long checkpointID,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 81b9df55f..8da627dd5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -43,11 +43,10 @@ import
org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import com.hazelcast.cluster.Address;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.Serializable;
@@ -63,13 +62,13 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
+@Slf4j
public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends
CoordinatorTask {
- private static final ILogger LOGGER =
Logger.getLogger(SourceSplitEnumeratorTask.class);
-
private static final long serialVersionUID = -3713701594297977775L;
private final SourceAction<?, SplitT, Serializable> source;
@@ -93,7 +92,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
currState = SeaTunnelTaskState.INIT;
super.init();
readerRegisterComplete = false;
- LOGGER.info("starting seatunnel source split enumerator task, source
name: " + source.getName());
+ log.info("starting seatunnel source split enumerator task, source
name: " + source.getName());
enumeratorContext = new
SeaTunnelSplitEnumeratorContext<>(this.source.getParallelism(), this);
enumeratorStateSerializer =
this.source.getSource().getEnumeratorStateSerializer();
taskMemberMapping = new ConcurrentHashMap<>();
@@ -160,15 +159,19 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
this.enumerator = this.source.getSource()
.createEnumerator(enumeratorContext);
}
- restoreComplete = true;
+ restoreComplete.complete(null);
}
- public void addSplitsBack(List<SplitT> splits, int subtaskId) {
- enumerator.addSplitsBack(splits, subtaskId);
+ public void addSplitsBack(List<SplitT> splits, int subtaskId) throws
ExecutionException, InterruptedException {
+ getEnumerator().addSplitsBack(splits, subtaskId);
}
- public void receivedReader(TaskLocation readerId, Address memberAddr) {
- LOGGER.info("received reader register, readerID: " + readerId);
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public void receivedReader(TaskLocation readerId, Address memberAddr)
+ throws InterruptedException, ExecutionException {
+ log.info("received reader register, readerID: " + readerId);
+
+ SourceSplitEnumerator<SplitT, Serializable> enumerator =
getEnumerator();
this.addTaskMemberMapping(readerId, memberAddr);
enumerator.registerReader(readerId.getTaskIndex());
if (maxReaderSize == taskMemberMapping.size()) {
@@ -176,12 +179,13 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
}
}
- public void requestSplit(long taskIndex) {
- enumerator.handleSplitRequest((int) taskIndex);
+ public void requestSplit(long taskIndex) throws ExecutionException,
InterruptedException {
+ getEnumerator().handleSplitRequest((int) taskIndex);
}
- public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
- enumerator.handleSourceEvent(subtaskId, sourceEvent);
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent)
+ throws ExecutionException, InterruptedException {
+ getEnumerator().handleSourceEvent(subtaskId, sourceEvent);
}
public void addTaskMemberMapping(TaskLocation taskID, Address memberAdder)
{
@@ -207,6 +211,17 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
return taskIndexToTaskLocationMapping.get(taskIndex);
}
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private SourceSplitEnumerator<SplitT, Serializable> getEnumerator() throws
InterruptedException, ExecutionException {
+ //(restoreComplete == null) means that the Task has not yet executed
Init, so we need to wait.
+ while (null == restoreComplete){
+ log.warn("Task init is not complete, try to get it again after 200
ms");
+ Thread.sleep(200);
+ }
+ restoreComplete.get();
+ return enumerator;
+ }
+
public void readerFinished(long taskID) {
unfinishedReaders.remove(taskID);
if (unfinishedReaders.isEmpty()) {
@@ -222,7 +237,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
reportTaskStatus(WAITING_RESTORE);
break;
case WAITING_RESTORE:
- if (restoreComplete) {
+ if (restoreComplete.isDone()) {
currState = READY_START;
reportTaskStatus(READY_START);
}
@@ -235,7 +250,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
break;
case STARTING:
currState = RUNNING;
- LOGGER.info("received enough reader, starting enumerator...");
+ log.info("received enough reader, starting enumerator...");
enumerator.run();
break;
case RUNNING:
@@ -285,7 +300,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- enumerator.notifyCheckpointComplete(checkpointId);
+ getEnumerator().notifyCheckpointComplete(checkpointId);
if (currState == PREPARE_CLOSE && prepareCloseBarrierId.get() ==
checkpointId) {
closeCall();
}
@@ -293,7 +308,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
- enumerator.notifyCheckpointAborted(checkpointId);
+ getEnumerator().notifyCheckpointAborted(checkpointId);
if (currState == PREPARE_CLOSE && prepareCloseBarrierId.get() ==
checkpointId) {
closeCall();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
index fd46082ab..b1a08e76c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task.operation.checkpoint;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -50,7 +51,7 @@ public class CloseRequestOperation extends Operation
implements IdentifiedDataSe
task.close();
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException &&
+ exception -> exception instanceof SeaTunnelException &&
!server.taskIsEnded(readerLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 7f42e4aed..23b0afd3b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -58,7 +59,7 @@ public class AssignSplitOperation<SplitT extends SourceSplit>
extends Operation
task.receivedSourceSplit(Arrays.stream(o).map(i -> (SplitT)
i).collect(Collectors.toList()));
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException &&
+ exception -> exception instanceof SeaTunnelException &&
!server.taskIsEnded(taskID.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index dcad06c96..6fe517abd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -54,7 +55,7 @@ public class RequestSplitOperation extends Operation
implements IdentifiedDataSe
task.requestSplit(taskID.getTaskIndex());
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException &&
+ exception -> exception instanceof SeaTunnelException &&
!server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 4da0c2896..095efbe42 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -88,7 +89,7 @@ public class RestoredSplitOperation extends TaskOperation {
task.addSplitsBack(deserialize, subtaskIndex);
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException &&
+ exception -> exception instanceof SeaTunnelException &&
!server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index e1dfc432f..420db4fb9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -53,7 +54,7 @@ public class SourceNoMoreElementOperation extends Operation
implements Identifie
task.readerFinished(currentTaskID.getTaskID());
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException &&
+ exception -> exception instanceof SeaTunnelException &&
!server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
index efb13aeb7..39dc1c81c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -57,7 +58,7 @@ public class SourceReaderEventOperation extends
SourceEventOperation {
task.handleSourceEvent(currentTaskLocation.getTaskIndex(),
SerializationUtils.deserialize(sourceEvent, classLoader));
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException &&
+ exception -> exception instanceof SeaTunnelException &&
!server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index df003c277..12d83ba08 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -36,7 +37,7 @@ import java.io.IOException;
* the {@link org.apache.seatunnel.api.source.SourceSplitEnumerator}
*/
public class SourceRegisterOperation extends Operation implements
IdentifiedDataSerializable {
- private static final int RETRY_TIME = 5;
+ private static final int RETRY_TIME = 20;
private static final int RETRY_TIME_OUT = 2000;
@@ -61,7 +62,7 @@ public class SourceRegisterOperation extends Operation
implements IdentifiedData
task.receivedReader(readerTaskID, readerAddress);
return null;
}, new RetryUtils.RetryMaterial(RETRY_TIME, true,
- exception -> exception instanceof NullPointerException &&
+ exception -> exception instanceof SeaTunnelException &&
!server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()),
RETRY_TIME_OUT));
}