This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new d13a12901 [Engine] [Task] Add sink committer logic (#2721)
d13a12901 is described below
commit d13a12901140d6f7fa7813310ff3d69f34359915
Author: Hisoka <[email protected]>
AuthorDate: Tue Sep 13 17:32:18 2022 +0800
[Engine] [Task] Add sink committer logic (#2721)
---
.../CheckpointBarrierTriggerOperation.java | 2 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 2 +-
.../serializable/TaskDataSerializerHook.java | 10 +--
.../server/task/SinkAggregatedCommitterTask.java | 82 +++++++++-------------
.../engine/server/task/flow/SinkFlowLifeCycle.java | 45 ++++++++----
.../operation/sink/SinkPrepareCommitOperation.java | 15 +++-
.../task/operation/sink/SinkRegisterOperation.java | 22 ++----
.../operation/sink/SinkUnregisterOperation.java | 74 -------------------
8 files changed, 85 insertions(+), 167 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
index 9a7b22656..52fefe1cd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
@@ -37,7 +37,7 @@ import java.io.IOException;
@NoArgsConstructor
public class CheckpointBarrierTriggerOperation extends TaskOperation {
- private Barrier barrier;
+ protected Barrier barrier;
public CheckpointBarrierTriggerOperation(Barrier barrier, TaskLocation
taskLocation) {
super(taskLocation);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 50fa26b7f..25c4c8c7e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -208,7 +208,7 @@ public class PhysicalPlanGenerator {
TaskGroupLocation taskGroupLocation =
new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskTypeId, 0);
- SinkAggregatedCommitterTask<?> t =
+ SinkAggregatedCommitterTask<?, ?> t =
new
SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, s,
sinkAggregatedCommitter.get());
committerTaskIDMap.put(s, taskLocation);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 39dcd3ccd..c10edc1d2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -27,7 +27,6 @@ import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOper
import
org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequestOperation;
import
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
-import
org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
@@ -49,9 +48,7 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
public static final int TASK_GROUP_INFO_TYPE = 4;
public static final int SOURCE_UNREGISTER_TYPE = 5;
-
- public static final int SINK_UNREGISTER_TYPE = 6;
-
+ public static final int GET_TASKGROUP_ADDRESS_TYPE = 6;
public static final int SINK_REGISTER_TYPE = 7;
public static final int SINK_PREPARE_COMMIT_TYPE = 8;
@@ -66,9 +63,6 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
public static final int CANCEL_TASK_OPERATOR = 13;
- public static final int GET_TASKGROUP_ADDRESS_TYPE = 14;
-
-
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -101,8 +95,6 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
return new SourceNoMoreElementOperation();
case SINK_REGISTER_TYPE:
return new SinkRegisterOperation();
- case SINK_UNREGISTER_TYPE:
- return new SinkUnregisterOperation();
case SINK_PREPARE_COMMIT_TYPE:
return new SinkPrepareCommitOperation();
case TASK_LOCATION_TYPE:
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 143ce43f4..5d8eef25b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -45,7 +45,6 @@ import lombok.NonNull;
import java.io.IOException;
import java.net.URL;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -55,45 +54,48 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
-public class SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends
CoordinatorTask {
+public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT>
extends CoordinatorTask {
private static final ILogger LOGGER =
Logger.getLogger(SinkAggregatedCommitterTask.class);
private static final long serialVersionUID = 5906594537520393503L;
private SeaTunnelTaskState currState;
- private final SinkAction<?, ?, ?, AggregatedCommitInfoT> sink;
+ private final SinkAction<?, ?, CommandInfoT, AggregatedCommitInfoT> sink;
private final int maxWriterSize;
- private final SinkAggregatedCommitter<?, AggregatedCommitInfoT>
aggregatedCommitter;
+ private final SinkAggregatedCommitter<CommandInfoT, AggregatedCommitInfoT>
aggregatedCommitter;
private transient Serializer<AggregatedCommitInfoT>
aggregatedCommitInfoSerializer;
private Map<Long, Address> writerAddressMap;
- private Map<Long, List<AggregatedCommitInfoT>> checkpointCommitInfoMap;
+ private ConcurrentMap<Long, List<CommandInfoT>> commitInfoCache;
+
+ private ConcurrentMap<Long, List<AggregatedCommitInfoT>>
checkpointCommitInfoMap;
private Map<Long, Integer> checkpointBarrierCounter;
- private Map<Long, Map<Long, Long>> alreadyReceivedCommitInfo;
- private Object closeLock;
private CompletableFuture<Void> completableFuture;
- public SinkAggregatedCommitterTask(long jobID, TaskLocation taskID,
SinkAction<?, ?, ?, AggregatedCommitInfoT> sink,
- SinkAggregatedCommitter<?,
AggregatedCommitInfoT> aggregatedCommitter) {
+ private volatile boolean receivedSinkWriter;
+
+ public SinkAggregatedCommitterTask(long jobID, TaskLocation taskID,
SinkAction<?, ?, CommandInfoT, AggregatedCommitInfoT> sink,
+ SinkAggregatedCommitter<CommandInfoT,
AggregatedCommitInfoT> aggregatedCommitter) {
super(jobID, taskID);
this.sink = sink;
this.aggregatedCommitter = aggregatedCommitter;
this.maxWriterSize = sink.getParallelism();
+ this.receivedSinkWriter = false;
}
@Override
public void init() throws Exception {
super.init();
currState = INIT;
- this.closeLock = new Object();
this.checkpointBarrierCounter = new HashMap<>();
- this.alreadyReceivedCommitInfo = new ConcurrentHashMap<>();
+ this.commitInfoCache = new ConcurrentHashMap<>();
this.writerAddressMap = new ConcurrentHashMap<>();
this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
this.completableFuture = new CompletableFuture<>();
@@ -103,17 +105,8 @@ public class
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
public void receivedWriterRegister(TaskLocation writerID, Address address)
{
this.writerAddressMap.put(writerID.getTaskID(), address);
- }
-
- public void receivedWriterUnregister(TaskLocation writerID) {
- this.writerAddressMap.remove(writerID.getTaskID());
- if (writerAddressMap.isEmpty()) {
- try {
- this.close();
- } catch (IOException e) {
- LOGGER.severe("aggregated committer close failed", e);
- throw new TaskRuntimeException(e);
- }
+ if (maxWriterSize <= writerAddressMap.size()) {
+ receivedSinkWriter = true;
}
}
@@ -142,7 +135,9 @@ public class
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
}
break;
case STARTING:
- currState = RUNNING;
+ if (receivedSinkWriter) {
+ currState = RUNNING;
+ }
break;
case RUNNING:
if (prepareCloseStatus) {
@@ -169,11 +164,9 @@ public class
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
@Override
public void close() throws IOException {
- synchronized (closeLock) {
- aggregatedCommitter.close();
- progress.done();
- completableFuture.complete(null);
- }
+ aggregatedCommitter.close();
+ progress.done();
+ completableFuture.complete(null);
}
@Override
@@ -186,6 +179,10 @@ public class
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
prepareCloseStatus = true;
}
if (barrier.snapshot()) {
+ if (commitInfoCache.containsKey(barrier.getId())) {
+ AggregatedCommitInfoT aggregatedCommitInfoT =
aggregatedCommitter.combine(commitInfoCache.get(barrier.getId()));
+ checkpointCommitInfoMap.put(barrier.getId(),
Collections.singletonList(aggregatedCommitInfoT));
+ }
List<byte[]> states =
serializeStates(aggregatedCommitInfoSerializer,
checkpointCommitInfoMap.getOrDefault(barrier.getId(), Collections.emptyList()));
this.getExecutionContext().sendToMaster(new
TaskAcknowledgeOperation(this.taskLocation, (CheckpointBarrier) barrier,
Collections.singletonList(new ActionSubtaskState(sink.getId(),
-1, states))));
@@ -199,31 +196,14 @@ public class
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
.flatMap(Collection::stream)
.map(bytes -> sneaky(() ->
aggregatedCommitInfoSerializer.deserialize(bytes)))
.collect(Collectors.toList());
- // TODO: commit?
+ aggregatedCommitter.commit(aggregatedCommitInfos);
restoreComplete = true;
}
- public void receivedWriterCommitInfo(long checkpointID, long subTaskId,
- AggregatedCommitInfoT[] commitInfos) {
- checkpointCommitInfoMap.computeIfAbsent(checkpointID, id -> new
CopyOnWriteArrayList<>());
- alreadyReceivedCommitInfo.computeIfAbsent(checkpointID, id -> new
ConcurrentHashMap<>());
-
-
checkpointCommitInfoMap.get(checkpointID).addAll(Arrays.asList(commitInfos));
- Map<Long, Long> alreadyReceived =
alreadyReceivedCommitInfo.get(checkpointID);
- alreadyReceived.put(subTaskId, subTaskId);
- if (alreadyReceived.size() == maxWriterSize) {
- try {
- synchronized (closeLock) {
-
aggregatedCommitter.commit(checkpointCommitInfoMap.get(checkpointID));
- }
- checkpointCommitInfoMap.remove(checkpointID);
- alreadyReceivedCommitInfo.remove(checkpointID);
- } catch (IOException e) {
- LOGGER.severe("aggregated committer commit failed,
checkpointID: " + checkpointID, e);
- throw new TaskRuntimeException(e);
- }
- }
-
+ public void receivedWriterCommitInfo(long checkpointID,
+ CommandInfoT commitInfos) {
+ commitInfoCache.computeIfAbsent(checkpointID, id -> new
CopyOnWriteArrayList<>());
+ commitInfoCache.get(checkpointID).add(commitInfos);
}
@Override
@@ -234,6 +214,7 @@ public class
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
aggregatedCommitter.commit(checkpointCommitInfoMap.get(checkpointId));
+ checkpointCommitInfoMap.remove(checkpointId);
if (prepareCloseStatus) {
closeCall();
}
@@ -242,6 +223,7 @@ public class
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
aggregatedCommitter.abort(checkpointCommitInfoMap.get(checkpointId));
+ checkpointCommitInfoMap.remove(checkpointId);
if (prepareCloseStatus) {
closeCall();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index a3aa29fc2..f5eeb8e80 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -21,8 +21,10 @@ import static
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
import static
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
@@ -38,6 +40,7 @@ import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.cluster.Address;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -47,10 +50,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-public class SinkFlowLifeCycle<T, StateT> extends ActionFlowLifeCycle
implements OneInputFlowLifeCycle<Record<?>>, InternalCheckpointListener {
+public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable,
AggregatedCommitInfoT, StateT> extends ActionFlowLifeCycle implements
OneInputFlowLifeCycle<Record<?>>, InternalCheckpointListener {
- private final SinkAction<T, StateT, ?, ?> sinkAction;
- private SinkWriter<T, ?, StateT> writer;
+ private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT>
sinkAction;
+ private SinkWriter<T, CommitInfoT, StateT> writer;
private transient Optional<Serializer<StateT>> writerStateSerializer;
@@ -62,28 +65,33 @@ public class SinkFlowLifeCycle<T, StateT> extends
ActionFlowLifeCycle implements
private final TaskLocation committerTaskLocation;
- private final boolean containCommitter;
+ private Optional<SinkCommitter<CommitInfoT>> committer;
- public SinkFlowLifeCycle(SinkAction<T, StateT, ?, ?> sinkAction,
TaskLocation taskLocation, int indexID,
+ private Optional<CommitInfoT> lastCommitInfo;
+
+ private final boolean containAggCommitter;
+
+ public SinkFlowLifeCycle(SinkAction<T, StateT, CommitInfoT,
AggregatedCommitInfoT> sinkAction, TaskLocation taskLocation, int indexID,
SeaTunnelTask runningTask, TaskLocation
committerTaskLocation,
- boolean containCommitter, CompletableFuture<Void>
completableFuture) {
+ boolean containAggCommitter,
CompletableFuture<Void> completableFuture) {
super(sinkAction, runningTask, completableFuture);
this.sinkAction = sinkAction;
this.indexID = indexID;
this.taskLocation = taskLocation;
this.committerTaskLocation = committerTaskLocation;
- this.containCommitter = containCommitter;
+ this.containAggCommitter = containAggCommitter;
}
@Override
public void init() throws Exception {
this.writerStateSerializer =
sinkAction.getSink().getWriterStateSerializer();
+ this.committer = sinkAction.getSink().createCommitter();
}
@Override
public void open() throws Exception {
super.open();
- if (containCommitter) {
+ if (containAggCommitter) {
committerTaskAddress = getCommitterTaskAddress();
}
registerCommitter();
@@ -101,7 +109,7 @@ public class SinkFlowLifeCycle<T, StateT> extends
ActionFlowLifeCycle implements
}
private void registerCommitter() {
- if (containCommitter) {
+ if (containAggCommitter) {
runningTask.getExecutionContext().sendToMember(new
SinkRegisterOperation(taskLocation,
committerTaskLocation), committerTaskAddress).join();
}
@@ -122,9 +130,14 @@ public class SinkFlowLifeCycle<T, StateT> extends
ActionFlowLifeCycle implements
List<StateT> states =
writer.snapshotState(barrier.getId());
runningTask.addState(barrier, sinkAction.getId(),
serializeStates(writerStateSerializer.get(), states));
}
- // TODO: prepare commit
- runningTask.getExecutionContext().sendToMember(new
SinkPrepareCommitOperation(barrier, committerTaskLocation,
- new byte[0]), committerTaskAddress);
+ try {
+ lastCommitInfo = writer.prepareCommit();
+ } catch (Exception e) {
+ writer.abortPrepare();
+ throw e;
+ }
+ lastCommitInfo.ifPresent(commitInfoT ->
runningTask.getExecutionContext().sendToMember(new
SinkPrepareCommitOperation(barrier, committerTaskLocation,
+ SerializationUtils.serialize(commitInfoT)),
committerTaskAddress).join());
} else {
runningTask.getExecutionContext().sendToMember(new
CheckpointBarrierTriggerOperation(barrier, committerTaskLocation),
committerTaskAddress);
}
@@ -142,12 +155,16 @@ public class SinkFlowLifeCycle<T, StateT> extends
ActionFlowLifeCycle implements
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- // TODO: committer commit
+ if (committer.isPresent() && lastCommitInfo.isPresent()) {
+
committer.get().commit(Collections.singletonList(lastCommitInfo.get()));
+ }
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
- // TODO: committer abort
+ if (committer.isPresent() && lastCommitInfo.isPresent()) {
+
committer.get().abort(Collections.singletonList(lastCommitInfo.get()));
+ }
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index 3522d8d8e..6d970ed53 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.engine.server.task.operation.sink;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.nio.ObjectDataInput;
@@ -49,6 +52,11 @@ public class SinkPrepareCommitOperation extends
CheckpointBarrierTriggerOperatio
commitInfos = in.readByteArray();
}
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
+ }
+
@Override
public int getFactoryId() {
return TaskDataSerializerHook.FACTORY_ID;
@@ -61,7 +69,10 @@ public class SinkPrepareCommitOperation extends
CheckpointBarrierTriggerOperatio
@Override
public void run() throws Exception {
- super.run();
- // TODO: commit info to AggregatedCommitter
+ SeaTunnelServer server = getService();
+ SinkAggregatedCommitterTask<?, ?> committerTask =
server.getTaskExecutionService().getTask(taskLocation);
+ // TODO add classloader support with #2704
+ committerTask.receivedWriterCommitInfo(barrier.getId(),
SerializationUtils.deserialize(commitInfos));
+ committerTask.triggerBarrier(barrier);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index c11ccc858..1a326571d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -17,11 +17,11 @@
package org.apache.seatunnel.engine.server.task.operation.sink;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
-import org.apache.seatunnel.engine.server.task.TaskRuntimeException;
import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
@@ -53,21 +53,11 @@ public class SinkRegisterOperation extends Operation
implements IdentifiedDataSe
public void run() throws Exception {
SeaTunnelServer server = getService();
Address readerAddress = getCallerAddress();
- SinkAggregatedCommitterTask<?> task = null;
- for (int i = 0; i < RETRY_NUMBER; i++) {
- try {
- task =
server.getTaskExecutionService().getTask(committerTaskID);
- break;
- } catch (NullPointerException e) {
- LOGGER.warning("can't get committer task , waiting task
started, retry " + i);
- Thread.sleep(RETRY_INTERVAL);
- }
- }
- if (task == null) {
- LOGGER.severe("can't connect with committer task");
- throw new TaskRuntimeException("can't connect with committer
task");
- }
- task.receivedWriterRegister(writerTaskID, readerAddress);
+ RetryUtils.retryWithException(() -> {
+ SinkAggregatedCommitterTask<?, ?> task =
server.getTaskExecutionService().getTask(committerTaskID);
+ task.receivedWriterRegister(writerTaskID, readerAddress);
+ return null;
+ }, new RetryUtils.RetryMaterial(RETRY_NUMBER, true, e -> true,
RETRY_INTERVAL));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
deleted file mode 100644
index 84d523b80..000000000
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.operation.sink;
-
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
-
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import com.hazelcast.spi.impl.operationservice.Operation;
-
-import java.io.IOException;
-
-public class SinkUnregisterOperation extends Operation implements
IdentifiedDataSerializable {
-
- private TaskLocation currentTaskID;
- private TaskLocation committerTaskID;
-
- public SinkUnregisterOperation() {
- }
-
- public SinkUnregisterOperation(TaskLocation currentTaskID, TaskLocation
committerTaskID) {
- this.currentTaskID = currentTaskID;
- this.committerTaskID = committerTaskID;
- }
-
- @Override
- public void run() throws Exception {
- SeaTunnelServer server = getService();
- SinkAggregatedCommitterTask<?> task =
- server.getTaskExecutionService().getTask(committerTaskID);
- task.receivedWriterUnregister(currentTaskID);
- }
-
- @Override
- protected void writeInternal(ObjectDataOutput out) throws IOException {
- currentTaskID.writeData(out);
- committerTaskID.writeData(out);
- }
-
- @Override
- protected void readInternal(ObjectDataInput in) throws IOException {
- currentTaskID.readData(in);
- committerTaskID.readData(in);
- }
-
- @Override
- public int getFactoryId() {
- return TaskDataSerializerHook.FACTORY_ID;
- }
-
- @Override
- public int getClassId() {
- return TaskDataSerializerHook.SINK_UNREGISTER_TYPE;
- }
-}