This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 13b203f [FLINK-26440][connector/filesystem] Make CompactorOperatorStateHandler supporting unaligned checkpoint. 13b203f is described below commit 13b203fef748bdbe9b1d14ba01f23ca6c6b24b7e Author: Gen Luo <luogen...@gmail.com> AuthorDate: Fri Mar 4 11:24:44 2022 +0800 [FLINK-26440][connector/filesystem] Make CompactorOperatorStateHandler supporting unaligned checkpoint. This closes #18955. --- .../operator/CompactorOperatorStateHandler.java | 354 +++++++++++---------- .../file/sink/compactor/CompactorOperatorTest.java | 140 +++++++- 2 files changed, 314 insertions(+), 180 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java index 70e6458..37d83ec 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java @@ -23,16 +23,15 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.file.sink.FileSinkCommittable; -import org.apache.flink.connector.file.sink.compactor.FileCompactor; import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor; import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.RemainingRequestsSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; -import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -41,6 +40,9 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Either; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -61,31 +63,12 @@ public class CompactorOperatorStateHandler private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; private final BucketWriter<?, String> bucketWriter; - - private final FileCompactor fileCompactor; + private State state = State.HANDLING_STATE; private transient CompactService compactService; - - // Flag indicating the in-progress file of the previous run from the writer has been received - // and processed. - private boolean writerStateDrained = false; - - // Flag indicating all compaction related states are drained, the operator can now pass through - // everything. - private boolean stateDrained = false; - - // There may be a in-progress file of the previous run that we have to process as a compaction - // request first, or the file is invisible after committing. - // We have to hold the summary and committables (of this run), and send them along with the - // result of this compaction request, as well as the results of the remaining requests of this - // operator, if there are. - private CommittableSummary<FileSinkCommittable> holdingSummary; - private List<CommittableMessage<FileSinkCommittable>> holdingMessages; - private final List<CommittableMessage<FileSinkCommittable>> compactingMessages = - new ArrayList<>(); - private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> - compactingRequests = new ArrayList<>(); + compactingRequests = new LinkedList<>(); + private SimpleVersionedListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; private Iterable<Map<Long, List<CompactorRequest>>> stateRemaining; @@ -94,14 +77,25 @@ public class CompactorOperatorStateHandler BucketWriter<?, String> bucketWriter) { this.committableSerializer = committableSerializer; this.bucketWriter = bucketWriter; + } - this.fileCompactor = new IdenticalFileCompactor(); + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + // CompactService is not initialized yet, we can not submit requests here. + remainingRequestsState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(REMAINING_REQUESTS_RAW_STATES_DESC), + new RemainingRequestsSerializer( + new CompactorRequestSerializer(committableSerializer))); + stateRemaining = remainingRequestsState.get(); } @Override public void open() throws Exception { super.open(); - this.compactService = new CompactService(1, fileCompactor, bucketWriter); + this.compactService = new CompactService(1, new IdenticalFileCompactor(), bucketWriter); compactService.open(); if (stateRemaining != null) { @@ -122,12 +116,14 @@ public class CompactorOperatorStateHandler for (FileSinkCommittable toCompact : toCompactList) { CompactorRequest compactRequest = new CompactorRequest(bucketId); compactRequest.addToCompact(toCompact); - submit(compactRequest); + compactingRequests.add( + new Tuple2<>(compactRequest, submit(compactRequest))); } CompactorRequest passThroughRequest = new CompactorRequest(bucketId); toPassThrough.forEach(passThroughRequest::addToPassthrough); - submit(passThroughRequest); + compactingRequests.add( + new Tuple2<>(passThroughRequest, submit(passThroughRequest))); } } } @@ -141,77 +137,137 @@ public class CompactorOperatorStateHandler throws Exception { Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> record = element.getValue(); - if (stateDrained) { + if (state == State.PASSING_THROUGH_ALL) { // all input should be committable messages to pass through output.collect(new StreamRecord<>(record.left())); return; } + if (state == State.PASSING_THROUGH_COMMITTABLE) { + checkState( + record.isLeft(), + "Compacting requests is not expected once a normal committable is received."); + CommittableMessage<FileSinkCommittable> message = record.left(); + if (message instanceof CommittableWithLineage) { + checkState( + !isHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message), + "Hidden committable is not expected once a normal committable is received."); + output.collect(new StreamRecord<>(message)); + } else { + // Message is summary + if (compactingRequests.isEmpty()) { + // No compacting requests remained + state = State.PASSING_THROUGH_ALL; + output.collect(new StreamRecord<>(message)); + return; + } + appendCompactingResultsToSummary((CommittableSummary<FileSinkCommittable>) message); + } + return; + } + if (record.isRight()) { - submit(element.getValue().right()); + CompactorRequest request = element.getValue().right(); + compactingRequests.add(new Tuple2<>(request, submit(request))); return; } CommittableMessage<FileSinkCommittable> message = record.left(); - if (message instanceof CommittableSummary) { - checkState(holdingSummary == null, "Duplicate summary before the first checkpoint."); - holdingSummary = (CommittableSummary<FileSinkCommittable>) message; - holdingMessages = new ArrayList<>(holdingSummary.getNumberOfCommittables()); - } else { - boolean compacting = false; - CommittableWithLineage<FileSinkCommittable> committableWithLineage = - (CommittableWithLineage<FileSinkCommittable>) message; - if (committableWithLineage.getCommittable().hasPendingFile()) { - FileSinkCommittable committable = committableWithLineage.getCommittable(); - PendingFileRecoverable pendingFile = committable.getPendingFile(); - if (pendingFile.getPath() != null - && pendingFile.getPath().getName().startsWith(".")) { - // The pending file is the in-progress file of the previous run, which - // should be committed and compacted before sending to the committer. - CompactorRequest request = new CompactorRequest(committable.getBucketId()); - request.addToCompact(committable); - submit(request); - - compacting = true; - compactingMessages.add(message); - } else { - // A normal file is received, indicating the writer state is drained. - writerStateDrained = true; - if (compactingMessages.isEmpty() && compactingRequests.isEmpty()) { - // No state needs to be handled, the holding summary and all committable - // messages can be sent eagerly - checkState(holdingSummary != null); - output.collect(new StreamRecord<>(holdingSummary)); - holdingSummary = null; - - this.stateDrained = true; - output.collect(new StreamRecord<>(committableWithLineage)); - } - } + if (message instanceof CommittableWithLineage) { + if (isHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message)) { + handleHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message); + } else { + // No more hidden committable is expected + state = State.PASSING_THROUGH_COMMITTABLE; + output.collect(new StreamRecord<>(message)); } - if (!compacting && !stateDrained) { - // Compacting messages should not be added - // If the state is drained, no further messages need to be added - holdingMessages.add(message); + } else { + if (compactingRequests.isEmpty()) { + output.collect(new StreamRecord<>(message)); + return; } + appendCompactingResultsToSummary((CommittableSummary<FileSinkCommittable>) message); } } - @Override - public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - super.prepareSnapshotPreBarrier(checkpointId); - if (stateDrained) { - return; + private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommittable> summary) + throws ExecutionException, InterruptedException { + // To guarantee the order, we have to wait for all results here. + List<FileSinkCommittable> results = new ArrayList<>(); + for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t : + compactingRequests) { + t.f1.get().forEach(results::add); + } + compactingRequests.clear(); + + // Append the results to the summary and send them following it + output.collect( + new StreamRecord<>( + new CommittableSummary<>( + summary.getSubtaskId(), + summary.getNumberOfSubtasks(), + getCheckpointId(summary), + summary.getNumberOfCommittables() + results.size(), + summary.getNumberOfPendingCommittables() + results.size(), + summary.getNumberOfFailedCommittables()))); + for (FileSinkCommittable committable : results) { + output.collect( + new StreamRecord<>( + new CommittableWithLineage<>( + committable, + getCheckpointId(summary), + summary.getSubtaskId()))); + } + } + + private boolean isHiddenCommittable(CommittableWithLineage<FileSinkCommittable> message) { + return message.getCommittable().hasPendingFile() + && message.getCommittable().getPendingFile().getPath() != null + && message.getCommittable().getPendingFile().getPath().getName().startsWith("."); + } + + private void handleHiddenCommittable(CommittableWithLineage<FileSinkCommittable> message) + throws ExecutionException, InterruptedException { + FileSinkCommittable committable = message.getCommittable(); + + // The pending file is the in-progress file of the previous run, which + // should be committed and compacted before sending to the committer. + CompactorRequest request = new CompactorRequest(committable.getBucketId()); + request.addToCompact(committable); + + // Wait for the result synchronously, and pass though the result, but append + // cleanup request to the next summary, since the count of pending committable + // for this checkpoint is immutable now + Iterable<FileSinkCommittable> result = submit(request).get(); + Long checkpointId = getCheckpointId(message); + boolean pendingFileSent = false; + for (FileSinkCommittable c : result) { + if (c.hasPendingFile()) { + checkState( + !pendingFileSent, + "A in-progress file should not be converted to multiple pending files"); + pendingFileSent = true; + output.collect( + new StreamRecord<>( + new CommittableWithLineage<>( + c, checkpointId, message.getSubtaskId()))); + } else { + // Wrap cleanup request as pass through request and reserved in the + // compacting requests. + // These requests will be appended to the next summary, if there is. + CompactorRequest passThroughRequest = new CompactorRequest(c.getBucketId()); + passThroughRequest.addToPassthrough(c); + compactingRequests.add( + new Tuple2<>(passThroughRequest, submit(passThroughRequest))); + } } - drain(); - // The operator is stateless once drain is called. snapshotState is not necessary. } @Override public void endInput() throws Exception { - if (!stateDrained) { - drain(); - } + // Although there may be remaining cleanup requests in compactingRequests, there is no way + // to let Committer accepts them since the eoi summary has been sent. + // For now we can do nothing but leave them there. } @Override @@ -222,102 +278,74 @@ public class CompactorOperatorStateHandler } } - private void submit(CompactorRequest request) { - CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>(); - compactService.submit(request, resultFuture); - compactingRequests.add(new Tuple2<>(request, resultFuture)); - } + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + if (compactingRequests.isEmpty()) { + return; + } - private void drain() throws ExecutionException, InterruptedException { - checkState(holdingSummary != null); - checkState( - holdingSummary.getNumberOfPendingCommittables() - == holdingSummary.getNumberOfCommittables() - && holdingSummary.getNumberOfCommittables() - == holdingMessages.size() + compactingMessages.size()); - - Long checkpointId = - holdingSummary.getCheckpointId().isPresent() - ? holdingSummary.getCheckpointId().getAsLong() - : null; - int subtaskId = holdingSummary.getSubtaskId(); - - if (!compactingRequests.isEmpty()) { - CompletableFuture.allOf( - compactingRequests.stream() - .map(r -> r.f1) - .toArray(CompletableFuture[]::new)) - .join(); - - for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> - compacting : compactingRequests) { - CompletableFuture<Iterable<FileSinkCommittable>> future = compacting.f1; - checkState(future.isDone()); - // Exception is thrown if it's completed exceptionally - for (FileSinkCommittable c : future.get()) { - holdingMessages.add(new CommittableWithLineage<>(c, checkpointId, subtaskId)); + // Results of some requests are not drained by a summary. They should be reserved in the + // state and wait for the next summary. + + List<CompactorRequest> remainingRequests = new ArrayList<>(); + for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t : + compactingRequests) { + if (t.f1.isDone()) { + // We can add the results as a pass-through request if the compaction is done + Iterable<FileSinkCommittable> result = t.f1.get(); + if (result.iterator().hasNext()) { + String bucketId = result.iterator().next().getBucketId(); + CompactorRequest passThroughRequest = new CompactorRequest(bucketId); + result.forEach(passThroughRequest::addToPassthrough); + remainingRequests.add(passThroughRequest); } + } else { + // Or we add the original request in the state + remainingRequests.add(t.f0); } } - - // Appending the compacted committable to the holding summary - CommittableSummary<FileSinkCommittable> summary = - new CommittableSummary<>( - holdingSummary.getSubtaskId(), - holdingSummary.getNumberOfSubtasks(), - holdingSummary.getCheckpointId().isPresent() - ? holdingSummary.getCheckpointId().getAsLong() - : null, - holdingMessages.size(), - holdingMessages.size(), - holdingSummary.getNumberOfFailedCommittables()); - output.collect(new StreamRecord<>(summary)); - for (CommittableMessage<FileSinkCommittable> committable : holdingMessages) { - output.collect(new StreamRecord<>(committable)); - } - - // Remaining requests should be all done and their results are all emitted. - // From now on the operator is stateless. - remainingRequestsState.clear(); - - compactingRequests.clear(); - compactingMessages.clear(); - holdingSummary = null; - holdingMessages = null; - - if (writerStateDrained) { - // We can pass through everything if the writer state is also drained. - stateDrained = true; - compactService.close(); - compactService = null; - } + Map<Long, List<CompactorRequest>> requestsMap = new HashMap<>(); + requestsMap.put(-1L, remainingRequests); + remainingRequestsState.update(Collections.singletonList(requestsMap)); } - @Override - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - - remainingRequestsState = - new SimpleVersionedListState<>( - context.getOperatorStateStore() - .getListState(REMAINING_REQUESTS_RAW_STATES_DESC), - new RemainingRequestsSerializer( - new CompactorRequestSerializer(committableSerializer))); - - stateRemaining = remainingRequestsState.get(); + private Long getCheckpointId(CommittableMessage<FileSinkCommittable> message) { + return message.getCheckpointId().isPresent() ? message.getCheckpointId().getAsLong() : null; + } - // stateDrained can not be determined here, since even if the stateRemaining is empty, - // there may still be some requests from the coordinator and a in-progress file in the file - // writer + private CompletableFuture<Iterable<FileSinkCommittable>> submit(CompactorRequest request) { + CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>(); + compactService.submit(request, resultFuture); + return resultFuture; } @VisibleForTesting - public boolean isWriterStateDrained() { - return writerStateDrained; + public boolean isPassingThroughCommittable() { + return state == State.PASSING_THROUGH_COMMITTABLE; } @VisibleForTesting - public boolean isStateDrained() { - return stateDrained; + public boolean isPassingThroughAll() { + return state == State.PASSING_THROUGH_ALL; + } + + /** The handling state of this operator. */ + private enum State { + /** + * Handling states of the previous run, including compaction requests from the coordinator + * and the hidden committable from the writer. + */ + HANDLING_STATE, + /** + * All states of the previous run are handled, while some results need to be appended to the + * next summary. All committable can be passed through now. + */ + PASSING_THROUGH_COMMITTABLE, + /** + * All states of the previous run are handled and all results are sent. The job of this + * operator is done and everything can be passed through directly. + */ + PASSING_THROUGH_ALL } } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java index 6b7f397..09af83e 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java @@ -308,28 +308,26 @@ public class CompactorOperatorTest extends AbstractCompactTestBase { new CommittableWithLineage<>( committable("0", "7", 8), 3L, 0)))); - Assert.assertTrue(handler.isWriterStateDrained()); - Assert.assertFalse(handler.isStateDrained()); + Assert.assertTrue(handler.isPassingThroughCommittable()); + Assert.assertFalse(handler.isPassingThroughAll()); - // the result should not be emitted yet, but all requests should already be submitted - Assert.assertEquals(0, harness.extractOutputValues().size()); + harness.processElement( + new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 4L, 0, 0, 0)))); - compactor.getAllTasksFuture().join(); - // state should be drained, and all results and holding messages should be emitted - harness.prepareSnapshotPreBarrier(3); + harness.processElement( + new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 5L, 3, 3, 0)))); - Assert.assertTrue(handler.isStateDrained()); + Assert.assertTrue(handler.isPassingThroughAll()); - // summary should be merged into one - // 1 summary+ 1 compacted + (1 compacted committable + 1 compacted cleanup) * 7 + // 1 summary + (1 compacted committable + 1 compacted cleanup) * 6 + 1 hidden + 1 normal + // + 1 summary + 1 cleanup + 1 summary List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues(); - Assert.assertEquals(16, results.size()); + Assert.assertEquals(18, results.size()); SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0)) - .hasPendingCommittables(15); + .hasPendingCommittables(14); List<FileSinkCommittable> expectedResult = Arrays.asList( - committable("0", "7", 8), committable("0", "compacted-0", 1), cleanupPath("0", ".0"), committable("0", "compacted-1", 2), @@ -343,12 +341,120 @@ public class CompactorOperatorTest extends AbstractCompactTestBase { committable("0", "compacted-5", 6), cleanupPath("0", ".5"), committable("0", "compacted-6", 7), - cleanupPath("0", ".6")); + committable("0", "7", 8)); - for (int i = 1; i < results.size(); ++i) { - SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(i)) - .hasCommittable(expectedResult.get(i - 1)); + for (int i = 0; i < expectedResult.size(); ++i) { + SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(i + 1)) + .hasCommittable(expectedResult.get(i)); } + + SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(15)) + .hasPendingCommittables(1); + SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(16)) + .hasCommittable(cleanupPath("0", ".6")); + + SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(17)) + .hasPendingCommittables(3); + } + } + + @Test + public void testStateHandlerRestore() throws Exception { + OperatorSubtaskState state; + try (OneInputStreamOperatorTestHarness< + Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>, + CommittableMessage<FileSinkCommittable>> + harness = + new OneInputStreamOperatorTestHarness<>( + new CompactorOperatorStateHandler( + getTestCommittableSerializer(), + createTestBucketWriter()))) { + harness.setup(); + harness.open(); + + // remaining request from coordinator + harness.processElement( + new StreamRecord<>( + Either.Right( + request( + "0", + Collections.singletonList( + committable("0", ".1", 1)), + null) + .getValue()))); + + // process only summary during cp1, unaligned barrier may be processed ahead of the + // elements + harness.processElement( + new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 1L, 2, 2, 0)))); + + state = harness.snapshot(1, 1L); + + List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues(); + Assert.assertEquals(3, results.size()); + SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0)) + .hasPendingCommittables(4); + SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1)) + .hasCommittable(committable("0", "compacted-1", 1)); + SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(2)) + .hasCommittable(cleanupPath("0", ".1")); + } + + try (OneInputStreamOperatorTestHarness< + Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>, + CommittableMessage<FileSinkCommittable>> + harness = + new OneInputStreamOperatorTestHarness<>( + new CompactorOperatorStateHandler( + getTestCommittableSerializer(), + createTestBucketWriter()))) { + harness.setup(); + harness.initializeState(state); + harness.open(); + + harness.processElement( + new StreamRecord<>( + Either.Left( + new CommittableWithLineage<>( + committable("0", ".2", 2), 1L, 0)))); + + harness.processElement( + new StreamRecord<>( + Either.Left( + new CommittableWithLineage<>( + committable("0", "3", 3), 1L, 0)))); + + state = harness.snapshot(2, 2L); + + List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues(); + Assert.assertEquals(2, results.size()); + SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(0)) + .hasCommittable(committable("0", "2", 2)); + SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1)) + .hasCommittable(committable("0", "3", 3)); + } + + try (OneInputStreamOperatorTestHarness< + Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>, + CommittableMessage<FileSinkCommittable>> + harness = + new OneInputStreamOperatorTestHarness<>( + new CompactorOperatorStateHandler( + getTestCommittableSerializer(), + createTestBucketWriter()))) { + harness.setup(); + harness.initializeState(state); + harness.open(); + + harness.processElement( + new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 2L, 0, 0, 0)))); + + List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues(); + Assert.assertEquals(2, results.size()); + SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0)) + .hasPendingCommittables(1); + SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1)) + .hasCommittable(cleanupPath("0", ".2")); } }