gaoyunhaii commented on a change in pull request #18955: URL: https://github.com/apache/flink/pull/18955#discussion_r820245603
########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java ########## @@ -64,28 +69,15 @@ private final FileCompactor fileCompactor; Review comment: This class variable is not needed. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java ########## @@ -222,102 +264,64 @@ public void close() throws Exception { } } - private void submit(CompactorRequest request) { - CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>(); - compactService.submit(request, resultFuture); - compactingRequests.add(new Tuple2<>(request, resultFuture)); - } + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + if (compactingRequests.isEmpty()) { + return; + } - private void drain() throws ExecutionException, InterruptedException { - checkState(holdingSummary != null); - checkState( - holdingSummary.getNumberOfPendingCommittables() - == holdingSummary.getNumberOfCommittables() - && holdingSummary.getNumberOfCommittables() - == holdingMessages.size() + compactingMessages.size()); - - Long checkpointId = - holdingSummary.getCheckpointId().isPresent() - ? holdingSummary.getCheckpointId().getAsLong() - : null; - int subtaskId = holdingSummary.getSubtaskId(); - - if (!compactingRequests.isEmpty()) { - CompletableFuture.allOf( - compactingRequests.stream() - .map(r -> r.f1) - .toArray(CompletableFuture[]::new)) - .join(); - - for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> - compacting : compactingRequests) { - CompletableFuture<Iterable<FileSinkCommittable>> future = compacting.f1; - checkState(future.isDone()); - // Exception is thrown if it's completed exceptionally - for (FileSinkCommittable c : future.get()) { - holdingMessages.add(new CommittableWithLineage<>(c, checkpointId, subtaskId)); + // Results of some requests are not drained by a summary. They should be reserved in the + // state and wait for the next summary. + + List<CompactorRequest> remainingRequests = new ArrayList<>(); + for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t : + compactingRequests) { + if (t.f1.isDone()) { + // We can add the results as a pass-through request if the compaction is done + Iterable<FileSinkCommittable> result = t.f1.get(); + if (result.iterator().hasNext()) { + String bucketId = result.iterator().next().getBucketId(); + CompactorRequest passThroughRequest = new CompactorRequest(bucketId); + result.forEach(passThroughRequest::addToPassthrough); + remainingRequests.add(passThroughRequest); } + } else { + // Or we add the original request in the state + remainingRequests.add(t.f0); } } + Map<Long, List<CompactorRequest>> requestsMap = new HashMap<>(); + requestsMap.put(-1L, remainingRequests); + remainingRequestsState.update(Collections.singletonList(requestsMap)); + } - // Appending the compacted committable to the holding summary - CommittableSummary<FileSinkCommittable> summary = - new CommittableSummary<>( - holdingSummary.getSubtaskId(), - holdingSummary.getNumberOfSubtasks(), - holdingSummary.getCheckpointId().isPresent() - ? holdingSummary.getCheckpointId().getAsLong() - : null, - holdingMessages.size(), - holdingMessages.size(), - holdingSummary.getNumberOfFailedCommittables()); - output.collect(new StreamRecord<>(summary)); - for (CommittableMessage<FileSinkCommittable> committable : holdingMessages) { - output.collect(new StreamRecord<>(committable)); - } - - // Remaining requests should be all done and their results are all emitted. - // From now on the operator is stateless. - remainingRequestsState.clear(); - - compactingRequests.clear(); - compactingMessages.clear(); - holdingSummary = null; - holdingMessages = null; - - if (writerStateDrained) { - // We can pass through everything if the writer state is also drained. - stateDrained = true; - compactService.close(); - compactService = null; - } + private CompletableFuture<Iterable<FileSinkCommittable>> submit(CompactorRequest request) { + CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>(); + compactService.submit(request, resultFuture); + return resultFuture; } @Override public void initializeState(StateInitializationContext context) throws Exception { Review comment: Perhaps we could move the `initializeState` to be before the `open` function so that the functions follows the order of execution. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java ########## @@ -141,26 +135,68 @@ public void processElement( throws Exception { Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> record = element.getValue(); - if (stateDrained) { + if (passingThroughAll) { Review comment: The implementation of this method looks a bit complex to me. From my view it would be better if we could keep a style of state machine: ``` Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> record = element.getValue(); if (passingThroughAll) { // all input should be committable messages to pass through output.collect(new StreamRecord<>(record.left())); return; } else if (passingThrough) { // 1. if it is a committable, emit it directly // 2. else if it is summary // 2.a if compacting result is empty // 2.b if not } else { // 1. if it is compacting request, submit it // 2. else if it is summary // 2.a if compacting result is empty // 2.b if not // 3. else if it is a committable // 3.a if it is not normal, deal with it // 3.b if it is normal, passing through = true } ``` We may change the `passingThrough` and `passingThroughAll` back to a state enumeration. Besides, we could extract the sub-process to handle the `CommittableSummary` and `normal / hiddle` Committables to private functions to further reduce the complexity of this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org