gaoyunhaii commented on a change in pull request #18955:
URL: https://github.com/apache/flink/pull/18955#discussion_r820245603



##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
##########
@@ -64,28 +69,15 @@
 
     private final FileCompactor fileCompactor;

Review comment:
       This class variable is not needed. 

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
##########
@@ -222,102 +264,64 @@ public void close() throws Exception {
         }
     }
 
-    private void submit(CompactorRequest request) {
-        CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new 
CompletableFuture<>();
-        compactService.submit(request, resultFuture);
-        compactingRequests.add(new Tuple2<>(request, resultFuture));
-    }
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        if (compactingRequests.isEmpty()) {
+            return;
+        }
 
-    private void drain() throws ExecutionException, InterruptedException {
-        checkState(holdingSummary != null);
-        checkState(
-                holdingSummary.getNumberOfPendingCommittables()
-                                == holdingSummary.getNumberOfCommittables()
-                        && holdingSummary.getNumberOfCommittables()
-                                == holdingMessages.size() + 
compactingMessages.size());
-
-        Long checkpointId =
-                holdingSummary.getCheckpointId().isPresent()
-                        ? holdingSummary.getCheckpointId().getAsLong()
-                        : null;
-        int subtaskId = holdingSummary.getSubtaskId();
-
-        if (!compactingRequests.isEmpty()) {
-            CompletableFuture.allOf(
-                            compactingRequests.stream()
-                                    .map(r -> r.f1)
-                                    .toArray(CompletableFuture[]::new))
-                    .join();
-
-            for (Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>
-                    compacting : compactingRequests) {
-                CompletableFuture<Iterable<FileSinkCommittable>> future = 
compacting.f1;
-                checkState(future.isDone());
-                // Exception is thrown if it's completed exceptionally
-                for (FileSinkCommittable c : future.get()) {
-                    holdingMessages.add(new CommittableWithLineage<>(c, 
checkpointId, subtaskId));
+        // Results of some requests are not drained by a summary. They should 
be reserved in the
+        // state and wait for the next summary.
+
+        List<CompactorRequest> remainingRequests = new ArrayList<>();
+        for (Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>> t :
+                compactingRequests) {
+            if (t.f1.isDone()) {
+                // We can add the results as a pass-through request if the 
compaction is done
+                Iterable<FileSinkCommittable> result = t.f1.get();
+                if (result.iterator().hasNext()) {
+                    String bucketId = result.iterator().next().getBucketId();
+                    CompactorRequest passThroughRequest = new 
CompactorRequest(bucketId);
+                    result.forEach(passThroughRequest::addToPassthrough);
+                    remainingRequests.add(passThroughRequest);
                 }
+            } else {
+                // Or we add the original request in the state
+                remainingRequests.add(t.f0);
             }
         }
+        Map<Long, List<CompactorRequest>> requestsMap = new HashMap<>();
+        requestsMap.put(-1L, remainingRequests);
+        remainingRequestsState.update(Collections.singletonList(requestsMap));
+    }
 
-        // Appending the compacted committable to the holding summary
-        CommittableSummary<FileSinkCommittable> summary =
-                new CommittableSummary<>(
-                        holdingSummary.getSubtaskId(),
-                        holdingSummary.getNumberOfSubtasks(),
-                        holdingSummary.getCheckpointId().isPresent()
-                                ? holdingSummary.getCheckpointId().getAsLong()
-                                : null,
-                        holdingMessages.size(),
-                        holdingMessages.size(),
-                        holdingSummary.getNumberOfFailedCommittables());
-        output.collect(new StreamRecord<>(summary));
-        for (CommittableMessage<FileSinkCommittable> committable : 
holdingMessages) {
-            output.collect(new StreamRecord<>(committable));
-        }
-
-        // Remaining requests should be all done and their results are all 
emitted.
-        // From now on the operator is stateless.
-        remainingRequestsState.clear();
-
-        compactingRequests.clear();
-        compactingMessages.clear();
-        holdingSummary = null;
-        holdingMessages = null;
-
-        if (writerStateDrained) {
-            // We can pass through everything if the writer state is also 
drained.
-            stateDrained = true;
-            compactService.close();
-            compactService = null;
-        }
+    private CompletableFuture<Iterable<FileSinkCommittable>> 
submit(CompactorRequest request) {
+        CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new 
CompletableFuture<>();
+        compactService.submit(request, resultFuture);
+        return resultFuture;
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {

Review comment:
       Perhaps we could move the `initializeState` to be before the `open` 
function so that the functions follows the order of execution. 

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
##########
@@ -141,26 +135,68 @@ public void processElement(
             throws Exception {
         Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> 
record =
                 element.getValue();
-        if (stateDrained) {
+        if (passingThroughAll) {

Review comment:
       The implementation of this method looks a bit complex to me. From my 
view it would be better if we could keep a style of state machine: 
   
   ```
     Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> record =
             element.getValue();
     if (passingThroughAll) {
         // all input should be committable messages to pass through
         output.collect(new StreamRecord<>(record.left()));
         return;
     } else if (passingThrough) {
         // 1. if it is a committable, emit it directly
         // 2. else if it is summary
         //  2.a if compacting result is empty
         //  2.b if not
     } else {
         // 1. if it is compacting request, submit it
         // 2. else if it is summary
         //  2.a if compacting result is empty
         //  2.b if not
         // 3. else if it is a committable
         //  3.a if it is not normal, deal with it
         //  3.b if it is normal, passing through = true
     }
   ```
   
   We may change the `passingThrough` and `passingThroughAll` back to a state 
enumeration. 
   
   Besides, we could extract the sub-process to handle the `CommittableSummary` 
and `normal / hiddle` Committables to private functions to further reduce the 
complexity of this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to