This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 65f5a1653 [HotFix][Zeta] Fix NullPointerException when registering 
Reader with the enumerator (#4048)
65f5a1653 is described below

commit 65f5a1653bb7d559f76ca85cbfdc6dd485c95e45
Author: ic4y <[email protected]>
AuthorDate: Wed Feb 8 11:02:35 2023 +0800

    [HotFix][Zeta] Fix NullPointerException when registering Reader with the 
enumerator (#4048)
    
    * [HotFix][Zeta] Fix NullPointerException when registering Reader with the 
enumerator
---
 .../apache/seatunnel/engine/common/Constant.java   |  2 +-
 .../engine/server/TaskExecutionService.java        | 15 ++++---
 .../operation/NotifyTaskStartOperation.java        |  8 ++--
 .../operation/TaskReportStatusOperation.java       | 20 ++++++---
 .../seatunnel/engine/server/task/AbstractTask.java |  5 ++-
 .../engine/server/task/SeaTunnelTask.java          |  4 +-
 .../server/task/SinkAggregatedCommitterTask.java   |  4 +-
 .../server/task/SourceSplitEnumeratorTask.java     | 51 ++++++++++++++--------
 .../checkpoint/CloseRequestOperation.java          |  3 +-
 .../operation/source/AssignSplitOperation.java     |  3 +-
 .../operation/source/RequestSplitOperation.java    |  3 +-
 .../operation/source/RestoredSplitOperation.java   |  3 +-
 .../source/SourceNoMoreElementOperation.java       |  3 +-
 .../source/SourceReaderEventOperation.java         |  3 +-
 .../operation/source/SourceRegisterOperation.java  |  5 ++-
 15 files changed, 86 insertions(+), 46 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 0fb820cba..d413938c2 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -36,7 +36,7 @@ public class Constant {
 
     public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML = 
"seatunnel.yaml";
 
-    public static final int OPERATION_RETRY_TIME = 10;
+    public static final int OPERATION_RETRY_TIME = 30;
 
     public static final int OPERATION_RETRY_SLEEP = 2000;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 688a38c69..195415c4b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -32,6 +32,7 @@ import static java.util.stream.Collectors.toList;
 
 import org.apache.seatunnel.api.common.metrics.MetricTags;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
@@ -62,7 +63,6 @@ import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.map.IMap;
-import com.hazelcast.spi.exception.WrongTargetException;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import com.hazelcast.spi.properties.HazelcastProperties;
@@ -188,9 +188,14 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         return deployTask(taskImmutableInfo);
     }
 
-    public <T extends Task> T getTask(TaskLocation taskLocation) {
-        return 
this.getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup()
-            .getTask(taskLocation.getTaskID());
+    public <T extends Task> T getTask(@NonNull TaskLocation taskLocation) {
+        TaskGroupContext executionContext = 
this.getExecutionContext(taskLocation.getTaskGroupLocation());
+        if (null == executionContext) {
+            throw new SeaTunnelException(
+                String.format("Failed to get Task, TaskLocation{%s} does not 
exist in TaskExecutionServer",
+                    taskLocation));
+        }
+        return 
executionContext.getTaskGroup().getTask(taskLocation.getTaskID());
     }
 
     public PassiveCompletableFuture<TaskExecutionState> deployTask(
@@ -370,7 +375,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                     }
                 });
             });
-        } catch (WrongTargetException e){
+        } catch (Exception e){
             logger.warning("The Imap acquisition failed due to the hazelcast 
node being offline or restarted, and will be retried next time", e);
         }
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
index ace26fb92..6106de015 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.checkpoint.operation;
 
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -48,12 +49,13 @@ public class NotifyTaskStartOperation extends TaskOperation 
{
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
-            AbstractTask task = 
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
-                .getTaskGroup().getTask(taskLocation.getTaskID());
+            AbstractTask task = server
+                .getTaskExecutionService()
+                .getTask(taskLocation);
             task.startCall();
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException &&
+            exception -> exception instanceof SeaTunnelException &&
                 !server.taskIsEnded(taskLocation.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
index 171353e6b..a00689810 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.checkpoint.operation;
 
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.server.CoordinatorService;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import 
org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook;
@@ -37,6 +39,10 @@ import java.io.IOException;
 @AllArgsConstructor
 public class TaskReportStatusOperation extends Operation implements 
IdentifiedDataSerializable {
 
+    private static final int RETRY_NUMBER = 20;
+
+    private static final int RETRY_INTERVAL = 2000;
+
     private TaskLocation location;
     private SeaTunnelTaskState status;
 
@@ -63,10 +69,14 @@ public class TaskReportStatusOperation extends Operation 
implements IdentifiedDa
     }
 
     @Override
-    public void run() {
-        ((SeaTunnelServer) getService())
-            .getCoordinatorService().getJobMaster(location.getJobId())
-            .getCheckpointManager()
-            .reportedTask(this);
+    public void run() throws Exception {
+        CoordinatorService coordinatorService = ((SeaTunnelServer) 
getService())
+            .getCoordinatorService();
+        RetryUtils.retryWithException(() -> {
+            coordinatorService.getJobMaster(location.getJobId())
+                .getCheckpointManager()
+                .reportedTask(this);
+            return null;
+        }, new RetryUtils.RetryMaterial(RETRY_NUMBER, true, e -> true, 
RETRY_INTERVAL));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 3daf0512b..643009b0c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -32,6 +32,7 @@ import lombok.NonNull;
 import java.net.URL;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -41,7 +42,7 @@ public abstract class AbstractTask implements Task {
     protected TaskExecutionContext executionContext;
     protected final long jobID;
     protected final TaskLocation taskLocation;
-    protected volatile boolean restoreComplete;
+    protected volatile CompletableFuture<Void> restoreComplete;
     protected volatile boolean startCalled;
     protected volatile boolean closeCalled;
     protected volatile boolean prepareCloseStatus;
@@ -54,7 +55,6 @@ public abstract class AbstractTask implements Task {
         this.taskLocation = taskLocation;
         this.jobID = jobID;
         this.progress = new Progress();
-        this.restoreComplete = false;
         this.startCalled = false;
         this.closeCalled = false;
         this.prepareCloseStatus = false;
@@ -74,6 +74,7 @@ public abstract class AbstractTask implements Task {
 
     @Override
     public void init() throws Exception {
+        this.restoreComplete = new CompletableFuture<>();
         progress.start();
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 5c434acbb..9d9a8bd17 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -137,7 +137,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
                 reportTaskStatus(WAITING_RESTORE);
                 break;
             case WAITING_RESTORE:
-                if (restoreComplete) {
+                if (restoreComplete.isDone()) {
                     for (FlowLifeCycle cycle : allCycles) {
                         cycle.open();
                     }
@@ -325,7 +325,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
                     sneakyThrow(e);
                 }
             });
-        restoreComplete = true;
+        restoreComplete.complete(null);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index dc42be5fd..fcd4a431d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -125,7 +125,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT> ex
                 reportTaskStatus(WAITING_RESTORE);
                 break;
             case WAITING_RESTORE:
-                if (restoreComplete) {
+                if (restoreComplete.isDone()) {
                     currState = READY_START;
                     reportTaskStatus(READY_START);
                 }
@@ -203,7 +203,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT> ex
             .map(bytes -> sneaky(() -> 
aggregatedCommitInfoSerializer.deserialize(bytes)))
             .collect(Collectors.toList());
         aggregatedCommitter.commit(aggregatedCommitInfos);
-        restoreComplete = true;
+        restoreComplete.complete(null);
     }
 
     public void receivedWriterCommitInfo(long checkpointID,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 81b9df55f..8da627dd5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -43,11 +43,10 @@ import 
org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
 import com.hazelcast.cluster.Address;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -63,13 +62,13 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends 
CoordinatorTask {
 
-    private static final ILogger LOGGER = 
Logger.getLogger(SourceSplitEnumeratorTask.class);
-
     private static final long serialVersionUID = -3713701594297977775L;
 
     private final SourceAction<?, SplitT, Serializable> source;
@@ -93,7 +92,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         currState = SeaTunnelTaskState.INIT;
         super.init();
         readerRegisterComplete = false;
-        LOGGER.info("starting seatunnel source split enumerator task, source 
name: " + source.getName());
+        log.info("starting seatunnel source split enumerator task, source 
name: " + source.getName());
         enumeratorContext = new 
SeaTunnelSplitEnumeratorContext<>(this.source.getParallelism(), this);
         enumeratorStateSerializer = 
this.source.getSource().getEnumeratorStateSerializer();
         taskMemberMapping = new ConcurrentHashMap<>();
@@ -160,15 +159,19 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
             this.enumerator = this.source.getSource()
                 .createEnumerator(enumeratorContext);
         }
-        restoreComplete = true;
+        restoreComplete.complete(null);
     }
 
-    public void addSplitsBack(List<SplitT> splits, int subtaskId) {
-        enumerator.addSplitsBack(splits, subtaskId);
+    public void addSplitsBack(List<SplitT> splits, int subtaskId) throws 
ExecutionException, InterruptedException {
+        getEnumerator().addSplitsBack(splits, subtaskId);
     }
 
-    public void receivedReader(TaskLocation readerId, Address memberAddr) {
-        LOGGER.info("received reader register, readerID: " + readerId);
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public void receivedReader(TaskLocation readerId, Address memberAddr)
+        throws InterruptedException, ExecutionException {
+        log.info("received reader register, readerID: " + readerId);
+
+        SourceSplitEnumerator<SplitT, Serializable> enumerator = 
getEnumerator();
         this.addTaskMemberMapping(readerId, memberAddr);
         enumerator.registerReader(readerId.getTaskIndex());
         if (maxReaderSize == taskMemberMapping.size()) {
@@ -176,12 +179,13 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         }
     }
 
-    public void requestSplit(long taskIndex) {
-        enumerator.handleSplitRequest((int) taskIndex);
+    public void requestSplit(long taskIndex) throws ExecutionException, 
InterruptedException {
+        getEnumerator().handleSplitRequest((int) taskIndex);
     }
 
-    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-        enumerator.handleSourceEvent(subtaskId, sourceEvent);
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent)
+        throws ExecutionException, InterruptedException {
+        getEnumerator().handleSourceEvent(subtaskId, sourceEvent);
     }
 
     public void addTaskMemberMapping(TaskLocation taskID, Address memberAdder) 
{
@@ -207,6 +211,17 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         return taskIndexToTaskLocationMapping.get(taskIndex);
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private SourceSplitEnumerator<SplitT, Serializable> getEnumerator() throws 
InterruptedException, ExecutionException {
+        //(restoreComplete == null) means that the Task has not yet executed 
Init, so we need to wait.
+        while (null == restoreComplete){
+            log.warn("Task init is not complete, try to get it again after 200 
ms");
+            Thread.sleep(200);
+        }
+        restoreComplete.get();
+        return enumerator;
+    }
+
     public void readerFinished(long taskID) {
         unfinishedReaders.remove(taskID);
         if (unfinishedReaders.isEmpty()) {
@@ -222,7 +237,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 reportTaskStatus(WAITING_RESTORE);
                 break;
             case WAITING_RESTORE:
-                if (restoreComplete) {
+                if (restoreComplete.isDone()) {
                     currState = READY_START;
                     reportTaskStatus(READY_START);
                 }
@@ -235,7 +250,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 break;
             case STARTING:
                 currState = RUNNING;
-                LOGGER.info("received enough reader, starting enumerator...");
+                log.info("received enough reader, starting enumerator...");
                 enumerator.run();
                 break;
             case RUNNING:
@@ -285,7 +300,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        enumerator.notifyCheckpointComplete(checkpointId);
+        getEnumerator().notifyCheckpointComplete(checkpointId);
         if (currState == PREPARE_CLOSE && prepareCloseBarrierId.get() == 
checkpointId) {
             closeCall();
         }
@@ -293,7 +308,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
 
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
-        enumerator.notifyCheckpointAborted(checkpointId);
+        getEnumerator().notifyCheckpointAborted(checkpointId);
         if (currState == PREPARE_CLOSE && prepareCloseBarrierId.get() == 
checkpointId) {
             closeCall();
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
index fd46082ab..b1a08e76c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task.operation.checkpoint;
 
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -50,7 +51,7 @@ public class CloseRequestOperation extends Operation 
implements IdentifiedDataSe
             task.close();
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException &&
+            exception -> exception instanceof SeaTunnelException &&
                 !server.taskIsEnded(readerLocation.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 7f42e4aed..23b0afd3b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -58,7 +59,7 @@ public class AssignSplitOperation<SplitT extends SourceSplit> 
extends Operation
             task.receivedSourceSplit(Arrays.stream(o).map(i -> (SplitT) 
i).collect(Collectors.toList()));
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException &&
+            exception -> exception instanceof SeaTunnelException &&
                 !server.taskIsEnded(taskID.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index dcad06c96..6fe517abd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -54,7 +55,7 @@ public class RequestSplitOperation extends Operation 
implements IdentifiedDataSe
             task.requestSplit(taskID.getTaskIndex());
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException &&
+            exception -> exception instanceof SeaTunnelException &&
                 !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 4da0c2896..095efbe42 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -88,7 +89,7 @@ public class RestoredSplitOperation extends TaskOperation {
             task.addSplitsBack(deserialize, subtaskIndex);
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException &&
+            exception -> exception instanceof SeaTunnelException &&
                 !server.taskIsEnded(taskLocation.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index e1dfc432f..420db4fb9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -53,7 +54,7 @@ public class SourceNoMoreElementOperation extends Operation 
implements Identifie
             task.readerFinished(currentTaskID.getTaskID());
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException &&
+            exception -> exception instanceof SeaTunnelException &&
                 !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
index efb13aeb7..39dc1c81c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -57,7 +58,7 @@ public class SourceReaderEventOperation extends 
SourceEventOperation {
             task.handleSourceEvent(currentTaskLocation.getTaskIndex(), 
SerializationUtils.deserialize(sourceEvent, classLoader));
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException &&
+            exception -> exception instanceof SeaTunnelException &&
                 !server.taskIsEnded(taskLocation.getTaskGroupLocation()), 
Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index df003c277..12d83ba08 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -36,7 +37,7 @@ import java.io.IOException;
  * the {@link org.apache.seatunnel.api.source.SourceSplitEnumerator}
  */
 public class SourceRegisterOperation extends Operation implements 
IdentifiedDataSerializable {
-    private static final int RETRY_TIME = 5;
+    private static final int RETRY_TIME = 20;
 
     private static final int RETRY_TIME_OUT = 2000;
 
@@ -61,7 +62,7 @@ public class SourceRegisterOperation extends Operation 
implements IdentifiedData
             task.receivedReader(readerTaskID, readerAddress);
             return null;
         }, new RetryUtils.RetryMaterial(RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException &&
+            exception -> exception instanceof SeaTunnelException &&
                 !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()), 
RETRY_TIME_OUT));
     }
 

Reply via email to