This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new d13a12901 [Engine] [Task] Add sink committer logic (#2721)
d13a12901 is described below

commit d13a12901140d6f7fa7813310ff3d69f34359915
Author: Hisoka <[email protected]>
AuthorDate: Tue Sep 13 17:32:18 2022 +0800

    [Engine] [Task] Add sink committer logic (#2721)
---
 .../CheckpointBarrierTriggerOperation.java         |  2 +-
 .../server/dag/physical/PhysicalPlanGenerator.java |  2 +-
 .../serializable/TaskDataSerializerHook.java       | 10 +--
 .../server/task/SinkAggregatedCommitterTask.java   | 82 +++++++++-------------
 .../engine/server/task/flow/SinkFlowLifeCycle.java | 45 ++++++++----
 .../operation/sink/SinkPrepareCommitOperation.java | 15 +++-
 .../task/operation/sink/SinkRegisterOperation.java | 22 ++----
 .../operation/sink/SinkUnregisterOperation.java    | 74 -------------------
 8 files changed, 85 insertions(+), 167 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
index 9a7b22656..52fefe1cd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
@@ -37,7 +37,7 @@ import java.io.IOException;
 
 @NoArgsConstructor
 public class CheckpointBarrierTriggerOperation extends TaskOperation {
-    private Barrier barrier;
+    protected Barrier barrier;
 
     public CheckpointBarrierTriggerOperation(Barrier barrier, TaskLocation 
taskLocation) {
         super(taskLocation);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 50fa26b7f..25c4c8c7e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -208,7 +208,7 @@ public class PhysicalPlanGenerator {
                     TaskGroupLocation taskGroupLocation =
                         new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
                     TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskTypeId, 0);
-                    SinkAggregatedCommitterTask<?> t =
+                    SinkAggregatedCommitterTask<?, ?> t =
                         new 
SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, s,
                             sinkAggregatedCommitter.get());
                     committerTaskIDMap.put(s, taskLocation);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 39dcd3ccd..c10edc1d2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -27,7 +27,6 @@ import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOper
 import 
org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequestOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
-import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
@@ -49,9 +48,7 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
     public static final int TASK_GROUP_INFO_TYPE = 4;
 
     public static final int SOURCE_UNREGISTER_TYPE = 5;
-
-    public static final int SINK_UNREGISTER_TYPE = 6;
-
+    public static final int GET_TASKGROUP_ADDRESS_TYPE = 6;
     public static final int SINK_REGISTER_TYPE = 7;
 
     public static final int SINK_PREPARE_COMMIT_TYPE = 8;
@@ -66,9 +63,6 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
 
     public static final int CANCEL_TASK_OPERATOR = 13;
 
-    public static final int GET_TASKGROUP_ADDRESS_TYPE = 14;
-
-
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -101,8 +95,6 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
                     return new SourceNoMoreElementOperation();
                 case SINK_REGISTER_TYPE:
                     return new SinkRegisterOperation();
-                case SINK_UNREGISTER_TYPE:
-                    return new SinkUnregisterOperation();
                 case SINK_PREPARE_COMMIT_TYPE:
                     return new SinkPrepareCommitOperation();
                 case TASK_LOCATION_TYPE:
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 143ce43f4..5d8eef25b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -45,7 +45,6 @@ import lombok.NonNull;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,45 +54,48 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
-public class SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends 
CoordinatorTask {
+public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT> 
extends CoordinatorTask {
 
     private static final ILogger LOGGER = 
Logger.getLogger(SinkAggregatedCommitterTask.class);
     private static final long serialVersionUID = 5906594537520393503L;
 
     private SeaTunnelTaskState currState;
-    private final SinkAction<?, ?, ?, AggregatedCommitInfoT> sink;
+    private final SinkAction<?, ?, CommandInfoT, AggregatedCommitInfoT> sink;
     private final int maxWriterSize;
 
-    private final SinkAggregatedCommitter<?, AggregatedCommitInfoT> 
aggregatedCommitter;
+    private final SinkAggregatedCommitter<CommandInfoT, AggregatedCommitInfoT> 
aggregatedCommitter;
 
     private transient Serializer<AggregatedCommitInfoT> 
aggregatedCommitInfoSerializer;
     private Map<Long, Address> writerAddressMap;
 
-    private Map<Long, List<AggregatedCommitInfoT>> checkpointCommitInfoMap;
+    private ConcurrentMap<Long, List<CommandInfoT>> commitInfoCache;
+
+    private ConcurrentMap<Long, List<AggregatedCommitInfoT>> 
checkpointCommitInfoMap;
 
     private Map<Long, Integer> checkpointBarrierCounter;
-    private Map<Long, Map<Long, Long>> alreadyReceivedCommitInfo;
-    private Object closeLock;
     private CompletableFuture<Void> completableFuture;
 
-    public SinkAggregatedCommitterTask(long jobID, TaskLocation taskID, 
SinkAction<?, ?, ?, AggregatedCommitInfoT> sink,
-                                       SinkAggregatedCommitter<?, 
AggregatedCommitInfoT> aggregatedCommitter) {
+    private volatile boolean receivedSinkWriter;
+
+    public SinkAggregatedCommitterTask(long jobID, TaskLocation taskID, 
SinkAction<?, ?, CommandInfoT, AggregatedCommitInfoT> sink,
+                                       SinkAggregatedCommitter<CommandInfoT, 
AggregatedCommitInfoT> aggregatedCommitter) {
         super(jobID, taskID);
         this.sink = sink;
         this.aggregatedCommitter = aggregatedCommitter;
         this.maxWriterSize = sink.getParallelism();
+        this.receivedSinkWriter = false;
     }
 
     @Override
     public void init() throws Exception {
         super.init();
         currState = INIT;
-        this.closeLock = new Object();
         this.checkpointBarrierCounter = new HashMap<>();
-        this.alreadyReceivedCommitInfo = new ConcurrentHashMap<>();
+        this.commitInfoCache = new ConcurrentHashMap<>();
         this.writerAddressMap = new ConcurrentHashMap<>();
         this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
         this.completableFuture = new CompletableFuture<>();
@@ -103,17 +105,8 @@ public class 
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
 
     public void receivedWriterRegister(TaskLocation writerID, Address address) 
{
         this.writerAddressMap.put(writerID.getTaskID(), address);
-    }
-
-    public void receivedWriterUnregister(TaskLocation writerID) {
-        this.writerAddressMap.remove(writerID.getTaskID());
-        if (writerAddressMap.isEmpty()) {
-            try {
-                this.close();
-            } catch (IOException e) {
-                LOGGER.severe("aggregated committer close failed", e);
-                throw new TaskRuntimeException(e);
-            }
+        if (maxWriterSize <= writerAddressMap.size()) {
+            receivedSinkWriter = true;
         }
     }
 
@@ -142,7 +135,9 @@ public class 
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
                 }
                 break;
             case STARTING:
-                currState = RUNNING;
+                if (receivedSinkWriter) {
+                    currState = RUNNING;
+                }
                 break;
             case RUNNING:
                 if (prepareCloseStatus) {
@@ -169,11 +164,9 @@ public class 
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
 
     @Override
     public void close() throws IOException {
-        synchronized (closeLock) {
-            aggregatedCommitter.close();
-            progress.done();
-            completableFuture.complete(null);
-        }
+        aggregatedCommitter.close();
+        progress.done();
+        completableFuture.complete(null);
     }
 
     @Override
@@ -186,6 +179,10 @@ public class 
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
             prepareCloseStatus = true;
         }
         if (barrier.snapshot()) {
+            if (commitInfoCache.containsKey(barrier.getId())) {
+                AggregatedCommitInfoT aggregatedCommitInfoT = 
aggregatedCommitter.combine(commitInfoCache.get(barrier.getId()));
+                checkpointCommitInfoMap.put(barrier.getId(), 
Collections.singletonList(aggregatedCommitInfoT));
+            }
             List<byte[]> states = 
serializeStates(aggregatedCommitInfoSerializer, 
checkpointCommitInfoMap.getOrDefault(barrier.getId(), Collections.emptyList()));
             this.getExecutionContext().sendToMaster(new 
TaskAcknowledgeOperation(this.taskLocation, (CheckpointBarrier) barrier,
                 Collections.singletonList(new ActionSubtaskState(sink.getId(), 
-1, states))));
@@ -199,31 +196,14 @@ public class 
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
             .flatMap(Collection::stream)
             .map(bytes -> sneaky(() -> 
aggregatedCommitInfoSerializer.deserialize(bytes)))
             .collect(Collectors.toList());
-        // TODO: commit?
+        aggregatedCommitter.commit(aggregatedCommitInfos);
         restoreComplete = true;
     }
 
-    public void receivedWriterCommitInfo(long checkpointID, long subTaskId,
-                                         AggregatedCommitInfoT[] commitInfos) {
-        checkpointCommitInfoMap.computeIfAbsent(checkpointID, id -> new 
CopyOnWriteArrayList<>());
-        alreadyReceivedCommitInfo.computeIfAbsent(checkpointID, id -> new 
ConcurrentHashMap<>());
-
-        
checkpointCommitInfoMap.get(checkpointID).addAll(Arrays.asList(commitInfos));
-        Map<Long, Long> alreadyReceived = 
alreadyReceivedCommitInfo.get(checkpointID);
-        alreadyReceived.put(subTaskId, subTaskId);
-        if (alreadyReceived.size() == maxWriterSize) {
-            try {
-                synchronized (closeLock) {
-                    
aggregatedCommitter.commit(checkpointCommitInfoMap.get(checkpointID));
-                }
-                checkpointCommitInfoMap.remove(checkpointID);
-                alreadyReceivedCommitInfo.remove(checkpointID);
-            } catch (IOException e) {
-                LOGGER.severe("aggregated committer commit failed, 
checkpointID: " + checkpointID, e);
-                throw new TaskRuntimeException(e);
-            }
-        }
-
+    public void receivedWriterCommitInfo(long checkpointID,
+                                         CommandInfoT commitInfos) {
+        commitInfoCache.computeIfAbsent(checkpointID, id -> new 
CopyOnWriteArrayList<>());
+        commitInfoCache.get(checkpointID).add(commitInfos);
     }
 
     @Override
@@ -234,6 +214,7 @@ public class 
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         aggregatedCommitter.commit(checkpointCommitInfoMap.get(checkpointId));
+        checkpointCommitInfoMap.remove(checkpointId);
         if (prepareCloseStatus) {
             closeCall();
         }
@@ -242,6 +223,7 @@ public class 
SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
         aggregatedCommitter.abort(checkpointCommitInfoMap.get(checkpointId));
+        checkpointCommitInfoMap.remove(checkpointId);
         if (prepareCloseStatus) {
             closeCall();
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index a3aa29fc2..f5eeb8e80 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -21,8 +21,10 @@ import static 
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
 import static 
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
 
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
@@ -38,6 +40,7 @@ import org.apache.seatunnel.engine.server.task.record.Barrier;
 import com.hazelcast.cluster.Address;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -47,10 +50,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-public class SinkFlowLifeCycle<T, StateT> extends ActionFlowLifeCycle 
implements OneInputFlowLifeCycle<Record<?>>, InternalCheckpointListener {
+public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, 
AggregatedCommitInfoT, StateT> extends ActionFlowLifeCycle implements 
OneInputFlowLifeCycle<Record<?>>, InternalCheckpointListener {
 
-    private final SinkAction<T, StateT, ?, ?> sinkAction;
-    private SinkWriter<T, ?, StateT> writer;
+    private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT> 
sinkAction;
+    private SinkWriter<T, CommitInfoT, StateT> writer;
 
     private transient Optional<Serializer<StateT>> writerStateSerializer;
 
@@ -62,28 +65,33 @@ public class SinkFlowLifeCycle<T, StateT> extends 
ActionFlowLifeCycle implements
 
     private final TaskLocation committerTaskLocation;
 
-    private final boolean containCommitter;
+    private Optional<SinkCommitter<CommitInfoT>> committer;
 
-    public SinkFlowLifeCycle(SinkAction<T, StateT, ?, ?> sinkAction, 
TaskLocation taskLocation, int indexID,
+    private Optional<CommitInfoT> lastCommitInfo;
+
+    private final boolean containAggCommitter;
+
+    public SinkFlowLifeCycle(SinkAction<T, StateT, CommitInfoT, 
AggregatedCommitInfoT> sinkAction, TaskLocation taskLocation, int indexID,
                              SeaTunnelTask runningTask, TaskLocation 
committerTaskLocation,
-                             boolean containCommitter, CompletableFuture<Void> 
completableFuture) {
+                             boolean containAggCommitter, 
CompletableFuture<Void> completableFuture) {
         super(sinkAction, runningTask, completableFuture);
         this.sinkAction = sinkAction;
         this.indexID = indexID;
         this.taskLocation = taskLocation;
         this.committerTaskLocation = committerTaskLocation;
-        this.containCommitter = containCommitter;
+        this.containAggCommitter = containAggCommitter;
     }
 
     @Override
     public void init() throws Exception {
         this.writerStateSerializer = 
sinkAction.getSink().getWriterStateSerializer();
+        this.committer = sinkAction.getSink().createCommitter();
     }
 
     @Override
     public void open() throws Exception {
         super.open();
-        if (containCommitter) {
+        if (containAggCommitter) {
             committerTaskAddress = getCommitterTaskAddress();
         }
         registerCommitter();
@@ -101,7 +109,7 @@ public class SinkFlowLifeCycle<T, StateT> extends 
ActionFlowLifeCycle implements
     }
 
     private void registerCommitter() {
-        if (containCommitter) {
+        if (containAggCommitter) {
             runningTask.getExecutionContext().sendToMember(new 
SinkRegisterOperation(taskLocation,
                 committerTaskLocation), committerTaskAddress).join();
         }
@@ -122,9 +130,14 @@ public class SinkFlowLifeCycle<T, StateT> extends 
ActionFlowLifeCycle implements
                         List<StateT> states = 
writer.snapshotState(barrier.getId());
                         runningTask.addState(barrier, sinkAction.getId(), 
serializeStates(writerStateSerializer.get(), states));
                     }
-                    // TODO: prepare commit
-                    runningTask.getExecutionContext().sendToMember(new 
SinkPrepareCommitOperation(barrier, committerTaskLocation,
-                        new byte[0]), committerTaskAddress);
+                    try {
+                        lastCommitInfo = writer.prepareCommit();
+                    } catch (Exception e) {
+                        writer.abortPrepare();
+                        throw e;
+                    }
+                    lastCommitInfo.ifPresent(commitInfoT -> 
runningTask.getExecutionContext().sendToMember(new 
SinkPrepareCommitOperation(barrier, committerTaskLocation,
+                        SerializationUtils.serialize(commitInfoT)), 
committerTaskAddress).join());
                 } else {
                     runningTask.getExecutionContext().sendToMember(new 
CheckpointBarrierTriggerOperation(barrier, committerTaskLocation), 
committerTaskAddress);
                 }
@@ -142,12 +155,16 @@ public class SinkFlowLifeCycle<T, StateT> extends 
ActionFlowLifeCycle implements
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        // TODO: committer commit
+        if (committer.isPresent() && lastCommitInfo.isPresent()) {
+            
committer.get().commit(Collections.singletonList(lastCommitInfo.get()));
+        }
     }
 
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
-        // TODO: committer abort
+        if (committer.isPresent() && lastCommitInfo.isPresent()) {
+            
committer.get().abort(Collections.singletonList(lastCommitInfo.get()));
+        }
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index 3522d8d8e..6d970ed53 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.engine.server.task.operation.sink;
 
+import org.apache.seatunnel.common.utils.SerializationUtils;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -49,6 +52,11 @@ public class SinkPrepareCommitOperation extends 
CheckpointBarrierTriggerOperatio
         commitInfos = in.readByteArray();
     }
 
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
+    }
+
     @Override
     public int getFactoryId() {
         return TaskDataSerializerHook.FACTORY_ID;
@@ -61,7 +69,10 @@ public class SinkPrepareCommitOperation extends 
CheckpointBarrierTriggerOperatio
 
     @Override
     public void run() throws Exception {
-        super.run();
-        // TODO: commit info to AggregatedCommitter
+        SeaTunnelServer server = getService();
+        SinkAggregatedCommitterTask<?, ?> committerTask = 
server.getTaskExecutionService().getTask(taskLocation);
+        // TODO add classloader support with #2704
+        committerTask.receivedWriterCommitInfo(barrier.getId(), 
SerializationUtils.deserialize(commitInfos));
+        committerTask.triggerBarrier(barrier);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index c11ccc858..1a326571d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.engine.server.task.operation.sink;
 
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
-import org.apache.seatunnel.engine.server.task.TaskRuntimeException;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
@@ -53,21 +53,11 @@ public class SinkRegisterOperation extends Operation 
implements IdentifiedDataSe
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         Address readerAddress = getCallerAddress();
-        SinkAggregatedCommitterTask<?> task = null;
-        for (int i = 0; i < RETRY_NUMBER; i++) {
-            try {
-                task = 
server.getTaskExecutionService().getTask(committerTaskID);
-                break;
-            } catch (NullPointerException e) {
-                LOGGER.warning("can't get committer task , waiting task 
started, retry " + i);
-                Thread.sleep(RETRY_INTERVAL);
-            }
-        }
-        if (task == null) {
-            LOGGER.severe("can't connect with committer task");
-            throw new TaskRuntimeException("can't connect with committer 
task");
-        }
-        task.receivedWriterRegister(writerTaskID, readerAddress);
+        RetryUtils.retryWithException(() -> {
+            SinkAggregatedCommitterTask<?, ?> task = 
server.getTaskExecutionService().getTask(committerTaskID);
+            task.receivedWriterRegister(writerTaskID, readerAddress);
+            return null;
+        }, new RetryUtils.RetryMaterial(RETRY_NUMBER, true, e -> true, 
RETRY_INTERVAL));
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
deleted file mode 100644
index 84d523b80..000000000
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkUnregisterOperation.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.operation.sink;
-
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
-
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import com.hazelcast.spi.impl.operationservice.Operation;
-
-import java.io.IOException;
-
-public class SinkUnregisterOperation extends Operation implements 
IdentifiedDataSerializable {
-
-    private TaskLocation currentTaskID;
-    private TaskLocation committerTaskID;
-
-    public SinkUnregisterOperation() {
-    }
-
-    public SinkUnregisterOperation(TaskLocation currentTaskID, TaskLocation 
committerTaskID) {
-        this.currentTaskID = currentTaskID;
-        this.committerTaskID = committerTaskID;
-    }
-
-    @Override
-    public void run() throws Exception {
-        SeaTunnelServer server = getService();
-        SinkAggregatedCommitterTask<?> task =
-                server.getTaskExecutionService().getTask(committerTaskID);
-        task.receivedWriterUnregister(currentTaskID);
-    }
-
-    @Override
-    protected void writeInternal(ObjectDataOutput out) throws IOException {
-        currentTaskID.writeData(out);
-        committerTaskID.writeData(out);
-    }
-
-    @Override
-    protected void readInternal(ObjectDataInput in) throws IOException {
-        currentTaskID.readData(in);
-        committerTaskID.readData(in);
-    }
-
-    @Override
-    public int getFactoryId() {
-        return TaskDataSerializerHook.FACTORY_ID;
-    }
-
-    @Override
-    public int getClassId() {
-        return TaskDataSerializerHook.SINK_UNREGISTER_TYPE;
-    }
-}

Reply via email to