This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f05cddbc374b30e3fae95ada5380c3f121ec26bb Author: Arvid Heise <[email protected]> AuthorDate: Sun Apr 13 10:10:58 2025 +0200 [FLINK-37605][runtime] Cleanup writer test Remove factory methods and InspectableSink because we don't need the abstraction anymore. Make test setup and assertions more explicit by using sink builder directly in tests. Remove unused methods. (cherry picked from commit 98284c4d24e61d12425c8d5f5fa17cfe40e816f9) --- .../SinkV2TransformationTranslatorITCase.java | 11 +- .../sink/SinkV2CommitterOperatorTest.java | 65 ++- .../sink/SinkV2SinkWriterOperatorTest.java | 574 ++++++++++----------- .../runtime/operators/sink/TestSinkV2.java | 368 +++++++------ .../flink/test/streaming/runtime/SinkV2ITCase.java | 218 +++++--- .../streaming/runtime/SinkV2MetricsITCase.java | 32 +- 6 files changed, 679 insertions(+), 589 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java index 9e3753031c0..8912281a080 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java @@ -66,12 +66,17 @@ class SinkV2TransformationTranslatorITCase { } Sink<Integer> sinkWithCommitter() { - return TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build(); + return TestSinkV2.<Integer>newBuilder() + .setCommitter(new TestSinkV2.DefaultCommitter<>(), TestSinkV2.RecordSerializer::new) + .build(); } Sink<Integer> sinkWithCommitterAndGlobalCommitter() { - return TestSinkV2.<Integer>newBuilder() - .setDefaultCommitter() + return ((TestSinkV2.Builder<Integer, TestSinkV2.Record<Integer>>) + TestSinkV2.<Integer>newBuilder() + .setCommitter( + new TestSinkV2.DefaultCommitter<>(), + TestSinkV2.RecordSerializer::new)) .setWithPostCommitTopology(true) .build(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 4c7291dd44c..950f76b75d5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -18,8 +18,11 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.sink2.SupportsCommitter; import org.apache.flink.configuration.SinkOptions; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; @@ -27,6 +30,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.function.SerializableSupplier; import org.assertj.core.api.AbstractThrowableAssert; import org.assertj.core.api.ListAssert; @@ -45,35 +49,39 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; class SinkV2CommitterOperatorTest { + + public static final SerializableSupplier<SimpleVersionedSerializer<String>> STRING_SERIALIZER = + () -> new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); + SinkAndCounters sinkWithPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); + ForwardingCommitter<String> committer = new ForwardingCommitter<>(); return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.newBuilder() - .setCommitter(committer) - .setWithPostCommitTopology(true) - .build(), + TestSinkV2.<Integer>newBuilder() + .setWriter(new TestSinkV2.ForwardCommittingSinkWriter<String>()) + .setCommitter(committer, STRING_SERIALIZER) + .setWithPostCommitTopology(true) + .build(), () -> committer.successfulCommits); } SinkAndCounters sinkWithPostCommitWithRetry() { return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.newBuilder() - .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setWithPostCommitTopology(true) - .build(), + TestSinkV2.newBuilder() + .setWriter(new TestSinkV2.ForwardCommittingSinkWriter<String>()) + .setCommitter(new TestSinkV2.RetryOnceCommitter<>(), STRING_SERIALIZER) + .setWithPostCommitTopology(true) + .build(), () -> 0); } SinkAndCounters sinkWithoutPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); + ForwardingCommitter<String> committer = new ForwardingCommitter<>(); return new SinkAndCounters( - TestSinkV2.newBuilder() - .setCommitter(committer) + TestSinkV2.<Integer>newBuilder() + .setWriter(new TestSinkV2.ForwardCommittingSinkWriter<String>()) + .setCommitter(committer, STRING_SERIALIZER) .setWithPostCommitTopology(false) - .build() - .asSupportsCommitter(), + .build(), () -> committer.successfulCommits); } @@ -239,20 +247,20 @@ class SinkV2CommitterOperatorTest { // create new testHarness but with different parallelism level and subtaskId that original // one. // we will make sure that new subtaskId was used during committable recovery. - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + SinkAndCounters restored = sinkWithPostCommit(); final OneInputStreamOperatorTestHarness< CommittableMessage<String>, CommittableMessage<String>> - restored = + restoredHarness = createTestHarness( - sinkAndCounters.sink, false, true, 10, 10, subtaskIdAfterRecovery); + restored.sink, false, true, 10, 10, subtaskIdAfterRecovery); - restored.initializeState(snapshot); - restored.open(); + restoredHarness.initializeState(snapshot); + restoredHarness.open(); // Previous committables are immediately committed if possible - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); + assertThat(restored.commitCounter.getAsInt()).isEqualTo(2); ListAssert<CommittableMessage<String>> records = - assertThat(restored.extractOutputValues()).hasSize(3); + assertThat(restoredHarness.extractOutputValues()).hasSize(3); CommittableSummaryAssert<Object> objectCommittableSummaryAssert = records.element(0, as(committableSummary())) .hasCheckpointId(checkpointId) @@ -269,7 +277,7 @@ class SinkV2CommitterOperatorTest { .hasCheckpointId(checkpointId) .hasSubtaskId(subtaskIdAfterRecovery) .hasCommittable(second.getCommittable()); - restored.close(); + restoredHarness.close(); } @ParameterizedTest @@ -373,11 +381,11 @@ class SinkV2CommitterOperatorTest { subtaskId); } - private static class ForwardingCommitter extends TestSinkV2.DefaultCommitter { + private static class ForwardingCommitter<CommT> extends TestSinkV2.DefaultCommitter<CommT> { private int successfulCommits = 0; @Override - public void commit(Collection<CommitRequest<String>> committables) { + public void commit(Collection<CommitRequest<CommT>> committables) { successfulCommits += committables.size(); } @@ -389,8 +397,9 @@ class SinkV2CommitterOperatorTest { SupportsCommitter<String> sink; IntSupplier commitCounter; - public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier commitCounter) { - this.sink = sink; + @SuppressWarnings("unchecked") + public SinkAndCounters(TestSinkV2<?> sink, IntSupplier commitCounter) { + this.sink = (SupportsCommitter<String>) sink; this.commitCounter = commitCounter; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java index 6f2c7f48de5..55fe2cbfae9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -45,13 +44,17 @@ import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommitter; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommittingSinkWriter; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultSinkWriter; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultStatefulSinkWriter; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer; import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; -import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList; - import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -60,7 +63,6 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -83,195 +85,187 @@ class SinkV2SinkWriterOperatorTest { "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt"); - InspectableSink sink = sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); + DefaultStatefulSinkWriter<Integer> writer = new DefaultStatefulSinkWriter<>(); + TestSinkV2<Integer> sink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWithPostCommitTopology(true) + .setWriter(writer) + .setWriterState(stateful) + .setCompatibleStateNames(CompatibleStateSinkOperator.SINK_STATE_NAME) + .build(); int expectedState = 5; - final OneInputStreamOperatorTestHarness<String, String> previousSink = + OperatorSubtaskState previousSinkState; + try (OneInputStreamOperatorTestHarness<String, String> previousSink = new OneInputStreamOperatorTestHarness<>( new CompatibleStateSinkOperator<>( TestSinkV2.WRITER_SERIALIZER, expectedState), - StringSerializer.INSTANCE); + StringSerializer.INSTANCE)) { - OperatorSubtaskState previousSinkState = - TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); + previousSinkState = TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); + } // 2. Load previous sink state and verify state - Sink<Integer> sink3 = sink.getSink(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> compatibleWriterOperator = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink3)); - - // load the state from previous sink - compatibleWriterOperator.initializeState(previousSinkState); - assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); + new SinkWriterOperatorFactory<>(sink))) { - // 3. do another snapshot and check if this also can be restored without compabitible state - // name - compatibleWriterOperator.prepareSnapshotPreBarrier(1L); - OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L); + // load the state from previous sink + compatibleWriterOperator.initializeState(previousSinkState); + assertThat(writer.getRecordCount()).isEqualTo(stateful ? expectedState : 0); - compatibleWriterOperator.close(); + // 3. do another snapshot and check if this also can be restored without compabitible + // state + // name + compatibleWriterOperator.prepareSnapshotPreBarrier(1L); + snapshot = compatibleWriterOperator.snapshot(1L, 1L); + } // 4. Restore the sink without previous sink's state - InspectableSink sink2 = - sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + DefaultStatefulSinkWriter<Integer> restoredWriter = new DefaultStatefulSinkWriter<>(); + TestSinkV2<Integer> restoredSink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWithPostCommitTopology(true) + .setWriter(restoredWriter) + .setWriterState(stateful) + .setCompatibleStateNames(CompatibleStateSinkOperator.SINK_STATE_NAME) + .build(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> restoredSinkOperator = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink2.getSink())); - - restoredSinkOperator.initializeState(snapshot); - assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - - restoredSinkOperator.close(); - } + new SinkWriterOperatorFactory<>(restoredSink))) { - InspectableSink sinkWithoutCommitter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build()); - } - - InspectableSink sinkWithCommitter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = - new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new InspectableSink( - TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - InspectableSink sinkWithTimeBasedWriter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TimeBasedBufferingSinkWriter(); - return new InspectableSink( - TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - InspectableSink sinkWithState(boolean withState, String stateName) { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = - new TestSinkV2.DefaultStatefulSinkWriter<>(); - TestSinkV2.Builder<Integer> builder = - TestSinkV2.<Integer>newBuilder() - .setDefaultCommitter() - .setWithPostCommitTopology(true) - .setWriter(sinkWriter); - if (withState) { - builder.setWriterState(true); - } - if (stateName != null) { - builder.setCompatibleStateNames(stateName); + restoredSinkOperator.initializeState(snapshot); + assertThat(restoredWriter.getRecordCount()).isEqualTo(stateful ? expectedState : 0); } - return new InspectableSink(builder.build()); } @Test void testNotEmitCommittablesWithoutCommitter() throws Exception { - InspectableSink sink = sinkWithoutCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - testHarness.processElement(1, 1); - - assertThat(testHarness.extractOutputValues()).isEmpty(); - assertThat(sink.getRecordsOfCurrentCheckpoint()) - .containsOnly("(1,1," + Long.MIN_VALUE + ")"); - - testHarness.prepareSnapshotPreBarrier(1); - assertThat(testHarness.extractOutputValues()).isEmpty(); - // Elements are flushed - assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); - testHarness.close(); + DefaultSinkWriter<Integer> writer = new DefaultSinkWriter<>(); + TestSinkV2<Integer> sink = TestSinkV2.newBuilder().setWriter(writer).build(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { + testHarness.open(); + testHarness.processElement(1, 1); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + assertThat(writer.getRecordsOfCurrentCheckpoint()) + .containsOnly(new Record<>(1, 1L, Long.MIN_VALUE)); + + testHarness.prepareSnapshotPreBarrier(1); + assertThat(testHarness.extractOutputValues()).isEmpty(); + // Elements are flushed + assertThat(writer.getRecordsOfCurrentCheckpoint()).isEmpty(); + } } @Test void testWatermarkPropagatedToSinkWriter() throws Exception { final long initialTime = 0; - InspectableSink sink = sinkWithoutCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - - testHarness.processWatermark(initialTime); - testHarness.processWatermark(initialTime + 1); - - assertThat(testHarness.getOutput()) - .containsExactly( - new org.apache.flink.streaming.api.watermark.Watermark(initialTime), - new org.apache.flink.streaming.api.watermark.Watermark(initialTime + 1)); - assertThat(sink.getWatermarks()) - .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); - testHarness.close(); + DefaultSinkWriter<Integer> writer = new DefaultSinkWriter<>(); + TestSinkV2<Integer> sink = TestSinkV2.newBuilder().setWriter(writer).build(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processWatermark(initialTime + 1); + + assertThat(testHarness.getOutput()) + .containsExactly( + new org.apache.flink.streaming.api.watermark.Watermark(initialTime), + new org.apache.flink.streaming.api.watermark.Watermark( + initialTime + 1)); + assertThat(writer.getWatermarks()) + .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); + } } @Test void testTimeBasedBufferingSinkWriter() throws Exception { final long initialTime = 0; - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink())); + DefaultSinkWriter<Integer> writer = new TimeBasedBufferingSinkWriter(); + TestSinkV2<Integer> sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { - testHarness.open(); + testHarness.open(); - testHarness.setProcessingTime(0L); + testHarness.setProcessingTime(0L); - testHarness.processElement(1, initialTime + 1); - testHarness.processElement(2, initialTime + 2); + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); - testHarness.prepareSnapshotPreBarrier(1L); + testHarness.prepareSnapshotPreBarrier(1L); - // Expect empty committableSummary - assertBasicOutput(testHarness.extractOutputValues(), 0, 1L); + // Expect empty committableSummary + assertBasicOutput(testHarness.extractOutputValues(), 0, 1L); - testHarness.getProcessingTimeService().setCurrentTime(2001); + testHarness.getProcessingTimeService().setCurrentTime(2001); - testHarness.prepareSnapshotPreBarrier(2L); + testHarness.prepareSnapshotPreBarrier(2L); - assertBasicOutput( - testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()), - 2, - 2L); - testHarness.close(); + assertBasicOutput( + testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()), + 2, + 2L); + } } @Test void testEmitOnFlushWithCommitter() throws Exception { - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink())); + DefaultSinkWriter<Integer> writer = new DefaultCommittingSinkWriter<>(); + TestSinkV2<Integer> sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { - testHarness.open(); - assertThat(testHarness.extractOutputValues()).isEmpty(); + testHarness.open(); + assertThat(testHarness.extractOutputValues()).isEmpty(); - testHarness.processElement(1, 1); - testHarness.processElement(2, 2); + testHarness.processElement(1, 1); + testHarness.processElement(2, 2); - // flush - testHarness.prepareSnapshotPreBarrier(1); + // flush + testHarness.prepareSnapshotPreBarrier(1); - assertBasicOutput(testHarness.extractOutputValues(), 2, 1L); - testHarness.close(); + assertBasicOutput(testHarness.extractOutputValues(), 2, 1L); + } } @Test void testEmitOnEndOfInputInBatchMode() throws Exception { + DefaultSinkWriter<Integer> writer = new DefaultCommittingSinkWriter<>(); + TestSinkV2<Integer> sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory = - new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); + new SinkWriterOperatorFactory<>(sink); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(writerOperatorFactory)) { - testHarness.open(); - assertThat(testHarness.extractOutputValues()).isEmpty(); + testHarness.open(); + assertThat(testHarness.extractOutputValues()).isEmpty(); - testHarness.processElement(1, 1); - testHarness.endInput(); - assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); + testHarness.processElement(1, 1); + testHarness.endInput(); + assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); + } } @ParameterizedTest @@ -280,74 +274,94 @@ class SinkV2SinkWriterOperatorTest { final long initialTime = 0; - final InspectableSink sink = sinkWithState(stateful, null); - Sink<Integer> sink2 = sink.getSink(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink2)); - - testHarness.open(); + DefaultStatefulSinkWriter<Integer> writer = new DefaultStatefulSinkWriter<>(); + final TestSinkV2<Integer> sink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWithPostCommitTopology(true) + .setWriter(writer) + .setWriterState(stateful) + .build(); + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { - testHarness.processWatermark(initialTime); - testHarness.processElement(1, initialTime + 1); - testHarness.processElement(2, initialTime + 2); + testHarness.open(); - testHarness.prepareSnapshotPreBarrier(1L); - OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L); + testHarness.processWatermark(initialTime); + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); - assertThat(sink.getRecordCountFromState()).isEqualTo(2); - assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); + testHarness.prepareSnapshotPreBarrier(1L); + snapshot = testHarness.snapshot(1L, 1L); - testHarness.close(); + assertThat(writer.getRecordCount()).isEqualTo(2); + assertThat(writer.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); + } - final InspectableSink restoredSink = sinkWithState(stateful, null); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + DefaultStatefulSinkWriter<Integer> restoredWriter = new DefaultStatefulSinkWriter<>(); + final TestSinkV2<Integer> restoredSink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWithPostCommitTopology(true) + .setWriter(restoredWriter) + .setWriterState(stateful) + .build(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> restoredTestHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(restoredSink.getSink())); - - restoredTestHarness.initializeState(snapshot); - restoredTestHarness.open(); + new SinkWriterOperatorFactory<>(restoredSink))) { - // check that the previous state is correctly restored - assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0); + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); - restoredTestHarness.close(); + // check that the previous state is correctly restored + assertThat(restoredWriter.getRecordCount()).isEqualTo(stateful ? 2 : 0); + } } @Test void testRestoreCommitterState() throws Exception { - final List<String> committables = Arrays.asList("state1", "state2"); - - InspectableSink sink = sinkWithCommitter(); - final OneInputStreamOperatorTestHarness<String, String> committer = + final List<Record<Integer>> committables = + Arrays.asList(new Record<>(1, 1L, 1), new Record<>(2, 2L, 2)); + + DefaultSinkWriter<Integer> writer = new DefaultCommittingSinkWriter<>(); + TestSinkV2<Integer> sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + final OperatorSubtaskState committerState; + try (OneInputStreamOperatorTestHarness<Record<Integer>, Record<Integer>> committer = new OneInputStreamOperatorTestHarness<>( - new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), - StringSerializer.INSTANCE); + new TestCommitterOperator(new RecordSerializer<>()))) { - final OperatorSubtaskState committerState = - TestHarnessUtil.buildSubtaskState(committer, committables); + committerState = TestHarnessUtil.buildSubtaskState(committer, committables); + } - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); + final ListAssert<CommittableMessage<Integer>> records; + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { - testHarness.initializeState(committerState); + testHarness.initializeState(committerState); - testHarness.open(); + testHarness.open(); - testHarness.prepareSnapshotPreBarrier(2); + testHarness.prepareSnapshotPreBarrier(2); - final ListAssert<CommittableMessage<Integer>> records = - assertThat(testHarness.extractOutputValues()).hasSize(4); + records = assertThat(testHarness.extractOutputValues()).hasSize(4); + } records.element(0, as(committableSummary())) .hasCheckpointId(INITIAL_CHECKPOINT_ID) .hasOverallCommittables(committables.size()); - records.<CommittableWithLineageAssert<String>>element(1, as(committableWithLineage())) + records.<CommittableWithLineageAssert<Record<Integer>>>element( + 1, as(committableWithLineage())) .hasCommittable(committables.get(0)) .hasCheckpointId(INITIAL_CHECKPOINT_ID) .hasSubtaskId(0); - records.<CommittableWithLineageAssert<String>>element(2, as(committableWithLineage())) + records.<CommittableWithLineageAssert<Record<Integer>>>element( + 2, as(committableWithLineage())) .hasCommittable(committables.get(1)) .hasCheckpointId(INITIAL_CHECKPOINT_ID) .hasSubtaskId(0); @@ -357,35 +371,44 @@ class SinkV2SinkWriterOperatorTest { @ParameterizedTest @ValueSource(booleans = {true, false}) void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - InspectableSink sink = sinkWithCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<String>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - testHarness.processElement(1, 1); - - assertThat(testHarness.extractOutputValues()).isEmpty(); - final String record = "(1,1," + Long.MIN_VALUE + ")"; - assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record); + DefaultSinkWriter<Integer> writer = new DefaultCommittingSinkWriter<>(); + TestSinkV2<Integer> sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Record<Integer>>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink))) { + testHarness.open(); + testHarness.processElement(1, 1); - testHarness.endInput(); + assertThat(testHarness.extractOutputValues()).isEmpty(); + Record<Integer> record = new Record<>(1, 1L, Long.MIN_VALUE); + assertThat(writer.getRecordsOfCurrentCheckpoint()).containsOnly(record); - if (isCheckpointingEnabled) { - testHarness.prepareSnapshotPreBarrier(1); - } + testHarness.endInput(); - List<String> committables = Collections.singletonList(record); + if (isCheckpointingEnabled) { + testHarness.prepareSnapshotPreBarrier(1); + } - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); - records.element(0, as(committableSummary())).hasOverallCommittables(committables.size()); + List<Record<Integer>> committables = Collections.singletonList(record); - records.filteredOn(message -> message instanceof CommittableWithLineage) - .map(message -> ((CommittableWithLineage<String>) message).getCommittable()) - .containsExactlyInAnyOrderElementsOf(committables); - assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); + ListAssert<CommittableMessage<Record<Integer>>> records = + assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); + records.element(0, as(committableSummary())) + .hasOverallCommittables(committables.size()); - testHarness.close(); + records.filteredOn(message -> message instanceof CommittableWithLineage) + .map( + message -> + ((CommittableWithLineage<Record<Integer>>) message) + .getCommittable()) + .containsExactlyInAnyOrderElementsOf(committables); + assertThat(writer.getRecordsOfCurrentCheckpoint()).isEmpty(); + } } @Test @@ -411,47 +434,28 @@ class SinkV2SinkWriterOperatorTest { .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) .build(); - final OneInputStreamOperatorTestHarness<String, CommittableMessage<String>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink), typeSerializer, environment); - testHarness.open(); - - assertThat(initContext.get().getUserCodeClassLoader()).isNotNull(); - assertThat(initContext.get().getMailboxExecutor()).isNotNull(); - assertThat(initContext.get().getProcessingTimeService()).isNotNull(); - assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId); - assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks()) - .isEqualTo(parallelism); - assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero(); - assertThat(initContext.get().metricGroup()).isNotNull(); - assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent(); - assertThat(initContext.get().isObjectReuseEnabled()).isTrue(); - assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer); - assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID); - - testHarness.close(); - } - - private static void assertContextsEqual( - WriterInitContext initContext, WriterInitContext original) { - assertThat(initContext.getUserCodeClassLoader().asClassLoader()) - .isEqualTo(original.getUserCodeClassLoader().asClassLoader()); - assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor()); - assertThat(initContext.getProcessingTimeService()) - .isEqualTo(original.getProcessingTimeService()); - assertThat(initContext.getTaskInfo().getIndexOfThisSubtask()) - .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask()); - assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks()) - .isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks()); - assertThat(initContext.getTaskInfo().getAttemptNumber()) - .isEqualTo(original.getTaskInfo().getAttemptNumber()); - assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup()); - assertThat(initContext.getRestoredCheckpointId()) - .isEqualTo(original.getRestoredCheckpointId()); - assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled()); - assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer()); - assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId()); - assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer()); + try (OneInputStreamOperatorTestHarness<String, CommittableMessage<Record<Integer>>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink), + typeSerializer, + environment)) { + testHarness.open(); + + assertThat(initContext.get().getUserCodeClassLoader()).isNotNull(); + assertThat(initContext.get().getMailboxExecutor()).isNotNull(); + assertThat(initContext.get().getProcessingTimeService()).isNotNull(); + assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()) + .isEqualTo(subtaskId); + assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks()) + .isEqualTo(parallelism); + assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero(); + assertThat(initContext.get().metricGroup()).isNotNull(); + assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent(); + assertThat(initContext.get().isObjectReuseEnabled()).isTrue(); + assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer); + assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID); + } } private static void assertBasicOutput( @@ -469,17 +473,16 @@ class SinkV2SinkWriterOperatorTest { .hasSubtaskId(0)); } - private static class TimeBasedBufferingSinkWriter - extends TestSinkV2.DefaultCommittingSinkWriter<Integer> + private static class TimeBasedBufferingSinkWriter extends DefaultCommittingSinkWriter<Integer> implements ProcessingTimeService.ProcessingTimeCallback { - private final List<String> cachedCommittables = new ArrayList<>(); + private final List<Record<Integer>> cachedCommittables = new ArrayList<>(); private ProcessingTimeService processingTimeService; @Override public void write(Integer element, Context context) { cachedCommittables.add( - Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); + new Record<>(element, context.timestamp(), context.currentWatermark())); } @Override @@ -496,74 +499,17 @@ class SinkV2SinkWriterOperatorTest { } } - private static class SnapshottingBufferingSinkWriter - extends TestSinkV2.DefaultStatefulSinkWriter { - public static final int NOT_SNAPSHOTTED = -1; - long lastCheckpointId = NOT_SNAPSHOTTED; - boolean endOfInput = false; - - @Override - public void flush(boolean endOfInput) throws IOException, InterruptedException { - this.endOfInput = endOfInput; - } - - @Override - public List<String> snapshotState(long checkpointId) throws IOException { - lastCheckpointId = checkpointId; - return super.snapshotState(checkpointId); - } - - @Override - public Collection<String> prepareCommit() { - if (!endOfInput) { - return ImmutableList.of(); - } - List<String> result = elements; - elements = new ArrayList<>(); - return result; - } - } - - static class InspectableSink { - private final TestSinkV2<Integer> sink; - - InspectableSink(TestSinkV2<Integer> sink) { - this.sink = sink; - } - - public TestSinkV2<Integer> getSink() { - return sink; - } - - public long getLastCheckpointId() { - return getSink().getWriter().lastCheckpointId; - } - - public List<String> getRecordsOfCurrentCheckpoint() { - return getSink().getWriter().elements; - } - - public List<Watermark> getWatermarks() { - return getSink().getWriter().watermarks; - } - - public int getRecordCountFromState() { - return ((TestSinkV2.DefaultStatefulSinkWriter<?>) getSink().getWriter()) - .getRecordCount(); - } - } - - private static class TestCommitterOperator extends AbstractStreamOperator<String> - implements OneInputStreamOperator<String, String> { + private static class TestCommitterOperator extends AbstractStreamOperator<Record<Integer>> + implements OneInputStreamOperator<Record<Integer>, Record<Integer>> { private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>( "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); - private ListState<List<String>> committerState; - private final List<String> buffer = new ArrayList<>(); - private final SimpleVersionedSerializer<String> serializer; + private ListState<List<Record<Integer>>> committerState; + private final List<Record<Integer>> buffer = new ArrayList<>(); + private final SimpleVersionedSerializer<Record<Integer>> serializer; - public TestCommitterOperator(SimpleVersionedSerializer<String> serializer) { + public TestCommitterOperator(SimpleVersionedSerializer<Record<Integer>> serializer) { this.serializer = serializer; } @@ -578,7 +524,7 @@ class SinkV2SinkWriterOperatorTest { } @Override - public void processElement(StreamRecord<String> element) throws Exception { + public void processElement(StreamRecord<Record<Integer>> element) { buffer.add(element.getValue()); } @@ -625,18 +571,18 @@ class SinkV2SinkWriterOperatorTest { } private static class TestingCommittableSerializer - extends SinkV1WriterCommittableSerializer<String> { + extends SinkV1WriterCommittableSerializer<Record<Integer>> { - private final SimpleVersionedSerializer<String> committableSerializer; + private final SimpleVersionedSerializer<Record<Integer>> committableSerializer; public TestingCommittableSerializer( - SimpleVersionedSerializer<String> committableSerializer) { + SimpleVersionedSerializer<Record<Integer>> committableSerializer) { super(committableSerializer); this.committableSerializer = committableSerializer; } @Override - public byte[] serialize(List<String> obj) throws IOException { + public byte[] serialize(List<Record<Integer>> obj) throws IOException { final DataOutputSerializer out = new DataOutputSerializer(256); out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER); SimpleVersionedSerialization.writeVersionAndSerializeList( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java index 18f934752a3..396af961830 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; @@ -30,7 +29,6 @@ import org.apache.flink.api.connector.sink2.StatefulSinkWriter; import org.apache.flink.api.connector.sink2.SupportsCommitter; import org.apache.flink.api.connector.sink2.SupportsWriterState; import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -41,12 +39,12 @@ import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.util.Preconditions; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.SerializableFunction; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableSet; -import javax.annotation.Nullable; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -54,94 +52,93 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; -import java.util.Queue; +import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.assertj.core.api.Assertions.assertThat; /** A {@link Sink} for all the sink related tests. */ public class TestSinkV2<InputT> implements Sink<InputT> { - public static final SimpleVersionedSerializerAdapter<String> COMMITTABLE_SERIALIZER = - new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); - public static final SimpleVersionedSerializerAdapter<Integer> WRITER_SERIALIZER = + public static final SimpleVersionedSerializer<Integer> WRITER_SERIALIZER = new SimpleVersionedSerializerAdapter<>(IntSerializer.INSTANCE); - private final DefaultSinkWriter<InputT> writer; + private final SinkWriter<InputT> writer; - private TestSinkV2(DefaultSinkWriter<InputT> writer) { + private TestSinkV2(SinkWriter<InputT> writer) { this.writer = writer; } public SinkWriter<InputT> createWriter(WriterInitContext context) { - writer.init(context); + if (writer instanceof DefaultSinkWriter) { + ((DefaultSinkWriter<InputT>) writer).init(context); + } return writer; } - DefaultSinkWriter<InputT> getWriter() { + SinkWriter<InputT> getWriter() { return writer; } - public static <InputT> Builder<InputT> newBuilder() { + public static <InputT> Builder<InputT, Record<InputT>> newBuilder() { return new Builder<>(); } - public static <InputT> Builder<InputT> newBuilder(DefaultSinkWriter<InputT> writer) { - return new Builder<InputT>().setWriter(writer); - } - - public SupportsCommitter<String> asSupportsCommitter() { - throw new UnsupportedOperationException("No committter"); + public static <InputT> Builder<InputT, Record<InputT>> newBuilder( + DefaultSinkWriter<InputT> writer) { + return new Builder<InputT, Record<InputT>>().setWriter(writer); } /** A builder class for {@link TestSinkV2}. */ - public static class Builder<InputT> { - private DefaultSinkWriter<InputT> writer = null; - private DefaultCommitter committer; + public static class Builder<InputT, CommT> { + private SinkWriter<InputT> writer = null; + private Committer<CommT> committer; private boolean withPostCommitTopology = false; - private boolean withPreCommitTopology = false; + private SerializableFunction<CommT, CommT> preCommitTopology = null; private boolean withWriterState = false; private String compatibleStateNames; + private SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory; - public Builder<InputT> setWriter(DefaultSinkWriter<InputT> writer) { - this.writer = checkNotNull(writer); - return this; + @SuppressWarnings("unchecked") + public <NewInputT, NewCommT> Builder<NewInputT, NewCommT> setWriter( + CommittingSinkWriter<NewInputT, NewCommT> writer) { + Builder<NewInputT, NewCommT> self = (Builder<NewInputT, NewCommT>) this; + self.writer = checkNotNull(writer); + return self; } - public Builder<InputT> setCommitter(DefaultCommitter committer) { - this.committer = committer; - return this; + @SuppressWarnings("unchecked") + public <NewInputT> Builder<NewInputT, CommT> setWriter(SinkWriter<NewInputT> writer) { + Builder<NewInputT, CommT> self = (Builder<NewInputT, CommT>) this; + self.writer = checkNotNull(writer); + return self; } - public Builder<InputT> setDefaultCommitter() { - this.committer = new DefaultCommitter(); - return this; - } - - public Builder<InputT> setDefaultCommitter( - Supplier<Queue<Committer.CommitRequest<String>>> queueSupplier) { - this.committer = new DefaultCommitter(queueSupplier); + public Builder<InputT, CommT> setCommitter( + Committer<CommT> committer, + SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory) { + this.committer = committer; + this.commSerializerFactory = commSerializerFactory; return this; } - public Builder<InputT> setWithPostCommitTopology(boolean withPostCommitTopology) { + public Builder<InputT, CommT> setWithPostCommitTopology(boolean withPostCommitTopology) { this.withPostCommitTopology = withPostCommitTopology; return this; } - public Builder<InputT> setWithPreCommitTopology(boolean withPreCommitTopology) { - this.withPreCommitTopology = withPreCommitTopology; + public Builder<InputT, CommT> setWithPreCommitTopology( + SerializableFunction<CommT, CommT> preCommitTopology) { + this.preCommitTopology = preCommitTopology; return this; } - public Builder<InputT> setWriterState(boolean withWriterState) { + public Builder<InputT, CommT> setWriterState(boolean withWriterState) { this.withWriterState = withWriterState; return this; } - public Builder<InputT> setCompatibleStateNames(String compatibleStateNames) { + public Builder<InputT, CommT> setCompatibleStateNames(String compatibleStateNames) { this.compatibleStateNames = compatibleStateNames; return this; } @@ -158,139 +155,130 @@ public class TestSinkV2<InputT> implements Sink<InputT> { writer = new DefaultCommittingSinkWriter<>(); } if (!withPostCommitTopology) { - if (!withPreCommitTopology) { + if (preCommitTopology == null) { // TwoPhaseCommittingSink with a stateless writer and a committer return new TestSinkV2TwoPhaseCommittingSink<>( - writer, COMMITTABLE_SERIALIZER, committer); + writer, commSerializerFactory, committer); } else { // TwoPhaseCommittingSink with a stateless writer, pre commit topology, // committer - Preconditions.checkArgument( - writer instanceof DefaultCommittingSinkWriter, - "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPreCommitTopology<>( - writer, COMMITTABLE_SERIALIZER, committer); + writer, commSerializerFactory, committer, preCommitTopology); } } else { if (withWriterState) { // TwoPhaseCommittingSink with a stateful writer and a committer and post // commit topology - Preconditions.checkArgument( - writer instanceof DefaultStatefulSinkWriter, - "Please provide a DefaultStatefulSinkWriter instance"); return new TestStatefulSinkV2<>( - (DefaultStatefulSinkWriter<InputT>) writer, - COMMITTABLE_SERIALIZER, - committer, - compatibleStateNames); + writer, commSerializerFactory, committer, compatibleStateNames); } else { // TwoPhaseCommittingSink with a stateless writer and a committer and post // commit topology - Preconditions.checkArgument( - writer instanceof DefaultCommittingSinkWriter, - "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPostCommitTopology<>( - writer, COMMITTABLE_SERIALIZER, committer); + writer, commSerializerFactory, committer); } } } } } - private static class TestSinkV2TwoPhaseCommittingSink<InputT> extends TestSinkV2<InputT> - implements SupportsCommitter<String> { - private final DefaultCommitter committer; - private final SimpleVersionedSerializer<String> committableSerializer; + private static class TestSinkV2TwoPhaseCommittingSink<InputT, CommT> extends TestSinkV2<InputT> + implements SupportsCommitter<CommT> { + private final Committer<CommT> committer; + protected final SerializableSupplier<SimpleVersionedSerializer<CommT>> + commSerializerFactory; public TestSinkV2TwoPhaseCommittingSink( - DefaultSinkWriter<InputT> writer, - SimpleVersionedSerializer<String> committableSerializer, - DefaultCommitter committer) { + SinkWriter<InputT> writer, + SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory, + Committer<CommT> committer) { super(writer); this.committer = committer; - this.committableSerializer = committableSerializer; + this.commSerializerFactory = commSerializerFactory; } @Override - public Committer<String> createCommitter(CommitterInitContext context) { - committer.init(); + public Committer<CommT> createCommitter(CommitterInitContext context) { + if (committer instanceof DefaultCommitter) { + ((DefaultCommitter<CommT>) committer).init(); + } return committer; } @Override - public SupportsCommitter<String> asSupportsCommitter() { - return this; - } - - @Override - public SimpleVersionedSerializer<String> getCommittableSerializer() { - return committableSerializer; + public SimpleVersionedSerializer<CommT> getCommittableSerializer() { + return commSerializerFactory.get(); } } // -------------------------------------- Sink With PostCommitTopology ------------------------- - private static class TestSinkV2WithPostCommitTopology<InputT> - extends TestSinkV2TwoPhaseCommittingSink<InputT> - implements SupportsPostCommitTopology<String> { + private static class TestSinkV2WithPostCommitTopology<InputT, CommT> + extends TestSinkV2TwoPhaseCommittingSink<InputT, CommT> + implements SupportsPostCommitTopology<CommT> { public TestSinkV2WithPostCommitTopology( - DefaultSinkWriter<InputT> writer, - SimpleVersionedSerializer<String> committableSerializer, - DefaultCommitter committer) { - super(writer, committableSerializer, committer); + SinkWriter<InputT> writer, + SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory, + Committer<CommT> committer) { + super(writer, commSerializerFactory, committer); } @Override - public void addPostCommitTopology(DataStream<CommittableMessage<String>> committables) { + public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) { StandardSinkTopologies.addGlobalCommitter( committables, this::createCommitter, this::getCommittableSerializer); } } - private static class TestSinkV2WithPreCommitTopology<InputT> - extends TestSinkV2TwoPhaseCommittingSink<InputT> - implements SupportsPreCommitTopology<String, String> { + private static class TestSinkV2WithPreCommitTopology<InputT, CommT> + extends TestSinkV2TwoPhaseCommittingSink<InputT, CommT> + implements SupportsPreCommitTopology<CommT, CommT> { + private final SerializableFunction<CommT, CommT> preCommitTopology; + public TestSinkV2WithPreCommitTopology( - DefaultSinkWriter<InputT> writer, - SimpleVersionedSerializer<String> committableSerializer, - DefaultCommitter committer) { - super(writer, committableSerializer, committer); + SinkWriter<InputT> writer, + SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory, + Committer<CommT> committer, + SerializableFunction<CommT, CommT> preCommitTopology) { + super(writer, commSerializerFactory, committer); + this.preCommitTopology = preCommitTopology; } @Override - public DataStream<CommittableMessage<String>> addPreCommitTopology( - DataStream<CommittableMessage<String>> committables) { + public DataStream<CommittableMessage<CommT>> addPreCommitTopology( + DataStream<CommittableMessage<CommT>> committables) { return committables .map( m -> { if (m instanceof CommittableSummary) { return m; } else { - CommittableWithLineage<String> withLineage = - (CommittableWithLineage<String>) m; - return withLineage.map(old -> old + "Transformed"); + CommittableWithLineage<CommT> withLineage = + (CommittableWithLineage<CommT>) m; + return withLineage.map(preCommitTopology); } }) - .returns(CommittableMessageTypeInfo.of(() -> COMMITTABLE_SERIALIZER)); + .returns(CommittableMessageTypeInfo.of(commSerializerFactory)); } @Override - public SimpleVersionedSerializer<String> getWriteResultSerializer() { - return new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); + public SimpleVersionedSerializer<CommT> getWriteResultSerializer() { + return commSerializerFactory.get(); } } - private static class TestStatefulSinkV2<InputT> extends TestSinkV2WithPostCommitTopology<InputT> + private static class TestStatefulSinkV2<InputT, CommT> + extends TestSinkV2WithPostCommitTopology<InputT, CommT> implements SupportsWriterState<InputT, Integer>, SupportsWriterState.WithCompatibleState { private final String compatibleState; public TestStatefulSinkV2( - DefaultStatefulSinkWriter<InputT> writer, - SimpleVersionedSerializer<String> committableSerializer, - DefaultCommitter committer, + SinkWriter<InputT> writer, + SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory, + Committer<CommT> committer, String compatibleState) { - super(writer, committableSerializer, committer); + super(writer, commSerializerFactory, committer); this.compatibleState = compatibleState; } @@ -303,7 +291,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { public StatefulSinkWriter<InputT, Integer> restoreWriter( WriterInitContext context, Collection<Integer> recoveredState) { DefaultStatefulSinkWriter<InputT> statefulWriter = - (DefaultStatefulSinkWriter) getWriter(); + (DefaultStatefulSinkWriter<InputT>) getWriter(); statefulWriter.restore(recoveredState); return statefulWriter; @@ -322,10 +310,90 @@ public class TestSinkV2<InputT> implements Sink<InputT> { // -------------------------------------- Sink Writer ------------------------------------------ + public static class Record<T> implements Serializable { + private final T value; + private final Long timestamp; + private final long watermark; + + public Record(T value, Long timestamp, long watermark) { + this.value = value; + this.timestamp = timestamp; + this.watermark = watermark; + } + + public T getValue() { + return value; + } + + public Long getTimestamp() { + return timestamp; + } + + public long getWatermark() { + return watermark; + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + Record<?> record = (Record<?>) object; + return Objects.equals(timestamp, record.timestamp) + && watermark == record.watermark + && Objects.equals(value, record.value); + } + + @Override + public int hashCode() { + return Objects.hash(value, timestamp, watermark); + } + + @Override + public String toString() { + return "Record{" + + "value=" + + value + + ", timestamp=" + + timestamp + + ", watermark=" + + watermark + + '}'; + } + + public Record<T> withValue(T value) { + return new Record<>(value, timestamp, watermark); + } + } + + public static class RecordSerializer<T> implements SimpleVersionedSerializer<Record<T>> { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(Record<T> record) throws IOException { + return InstantiationUtil.serializeObject(record); + } + + @Override + public Record<T> deserialize(int version, byte[] serialized) throws IOException { + try { + return InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + } + /** Base class for out testing {@link SinkWriter}. */ public static class DefaultSinkWriter<InputT> implements SinkWriter<InputT>, Serializable { - protected List<String> elements; + protected List<Record<InputT>> elements; protected List<Watermark> watermarks; @@ -338,8 +406,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { @Override public void write(InputT element, Context context) { - elements.add( - Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); + elements.add(new Record<>(element, context.timestamp(), context.currentWatermark())); } @Override @@ -347,6 +414,18 @@ public class TestSinkV2<InputT> implements Sink<InputT> { elements = new ArrayList<>(); } + public List<Record<InputT>> getRecordsOfCurrentCheckpoint() { + return elements; + } + + public List<Watermark> getWatermarks() { + return watermarks; + } + + public long getLastCheckpointId() { + return lastCheckpointId; + } + @Override public void writeWatermark(Watermark watermark) { watermarks.add(watermark); @@ -364,7 +443,23 @@ public class TestSinkV2<InputT> implements Sink<InputT> { /** Base class for out testing {@link CommittingSinkWriter}. */ protected static class DefaultCommittingSinkWriter<InputT> extends DefaultSinkWriter<InputT> - implements CommittingSinkWriter<InputT, String>, Serializable { + implements CommittingSinkWriter<InputT, Record<InputT>>, Serializable { + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + // We empty the elements on prepareCommit + } + + @Override + public Collection<Record<InputT>> prepareCommit() { + List<Record<InputT>> result = elements; + elements = new ArrayList<>(); + return result; + } + } + + protected static class ForwardCommittingSinkWriter<InputT> extends DefaultSinkWriter<InputT> + implements CommittingSinkWriter<InputT, InputT>, Serializable { @Override public void flush(boolean endOfInput) throws IOException, InterruptedException { @@ -372,8 +467,9 @@ public class TestSinkV2<InputT> implements Sink<InputT> { } @Override - public Collection<String> prepareCommit() { - List<String> result = elements; + public Collection<InputT> prepareCommit() { + List<InputT> result = + elements.stream().map(Record::getValue).collect(Collectors.toList()); elements = new ArrayList<>(); return result; } @@ -412,42 +508,15 @@ public class TestSinkV2<InputT> implements Sink<InputT> { // -------------------------------------- Sink Committer --------------------------------------- /** Base class for testing {@link Committer}. */ - public static class DefaultCommitter implements Committer<String>, Serializable { - - @Nullable protected Queue<CommitRequest<String>> committedData; - + public static class DefaultCommitter<CommT> implements Committer<CommT>, Serializable { private boolean isClosed; - @Nullable private final Supplier<Queue<CommitRequest<String>>> queueSupplier; - public DefaultCommitter() { - this.committedData = new ConcurrentLinkedQueue<>(); this.isClosed = false; - this.queueSupplier = null; - } - - public DefaultCommitter(@Nullable Supplier<Queue<CommitRequest<String>>> queueSupplier) { - this.queueSupplier = queueSupplier; - this.isClosed = false; - this.committedData = null; - } - - public List<CommitRequest<String>> getCommittedData() { - if (committedData != null) { - return new ArrayList<>(committedData); - } else { - return Collections.emptyList(); - } } @Override - public void commit(Collection<CommitRequest<String>> committables) { - if (committedData == null) { - assertThat(queueSupplier).isNotNull(); - committedData = queueSupplier.get(); - } - committedData.addAll(committables); - } + public void commit(Collection<CommitRequest<CommT>> committables) {} public void close() throws Exception { isClosed = true; @@ -463,18 +532,15 @@ public class TestSinkV2<InputT> implements Sink<InputT> { } /** A {@link Committer} that always re-commits the committables data it received. */ - static class RetryOnceCommitter extends DefaultCommitter { + static class RetryOnceCommitter<CommT> extends DefaultCommitter<CommT> { - private final Set<String> seen = new LinkedHashSet<>(); + private final Set<CommT> seen = new LinkedHashSet<>(); @Override - public void commit(Collection<CommitRequest<String>> committables) { + public void commit(Collection<CommitRequest<CommT>> committables) { committables.forEach( c -> { - if (seen.remove(c.getCommittable())) { - checkNotNull(committedData); - committedData.add(c); - } else { + if (!seen.remove(c.getCommittable())) { seen.add(c.getCommittable()); c.retryLater(); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java index 15e00e70b0f..88e89e3185e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java @@ -18,159 +18,163 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; -import org.apache.flink.streaming.util.FiniteTestSource; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.junit.SharedObjectsExtension; +import org.apache.flink.testutils.junit.SharedReference; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.BooleanSupplier; -import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.assertj.core.api.Assertions.assertThat; /** * Integration test for {@link org.apache.flink.api.connector.sink2.Sink} run time implementation. */ -public class SinkV2ITCase extends AbstractTestBaseJUnit4 { +public class SinkV2ITCase extends AbstractTestBase { + private static final Logger LOG = LoggerFactory.getLogger(SinkV2ITCase.class); + static final List<Integer> SOURCE_DATA = Arrays.asList( 895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, 714, 795, 288, 422); - // source send data two times - static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2; - - static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = + static final List<Record<Integer>> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = SOURCE_DATA.stream() // source send data two times .flatMap( x -> - Collections.nCopies( - 2, Tuple3.of(x, null, Long.MIN_VALUE).toString()) + Collections.nCopies(2, new Record<>(x, null, Long.MIN_VALUE)) .stream()) .collect(Collectors.toList()); - static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = + static final List<Record<Integer>> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = SOURCE_DATA.stream() - .map(x -> Tuple3.of(x, null, Long.MIN_VALUE).toString()) + .map(x -> new Record<>(x, null, Long.MIN_VALUE)) .collect(Collectors.toList()); - static final Queue<Committer.CommitRequest<String>> COMMIT_QUEUE = - new ConcurrentLinkedQueue<>(); - - static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA = - (BooleanSupplier & Serializable) - () -> COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM; - - @Before - public void init() { - COMMIT_QUEUE.clear(); - } + @RegisterExtension + static final SharedObjectsExtension SHARED_OBJECTS = SharedObjectsExtension.create(); @Test public void writerAndCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); - final FiniteTestSource<Integer> source = - new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); + SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> committed = + SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>()); + final Source<Integer, ?, ?> source = createStreamingSource(); - env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO) + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") .sinkTo( TestSinkV2.<Integer>newBuilder() - .setDefaultCommitter( - (Supplier<Queue<Committer.CommitRequest<String>>> - & Serializable) - () -> COMMIT_QUEUE) + .setCommitter( + new TrackingCommitter(committed), RecordSerializer::new) .build()); env.execute(); - assertThat( - COMMIT_QUEUE.stream() - .map(Committer.CommitRequest::getCommittable) - .collect(Collectors.toList()), - containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray())); + assertThat(committed.get()) + .extracting(Committer.CommitRequest::getCommittable) + .containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE); } @Test public void writerAndPrecommitToplogyAndCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); - final FiniteTestSource<Integer> source = - new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); + SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> committed = + SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>()); + final Source<Integer, ?, ?> source = createStreamingSource(); - env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO) + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") .sinkTo( TestSinkV2.<Integer>newBuilder() - .setDefaultCommitter( - (Supplier<Queue<Committer.CommitRequest<String>>> - & Serializable) - () -> COMMIT_QUEUE) - .setWithPreCommitTopology(true) + .setCommitter( + new TrackingCommitter(committed), RecordSerializer::new) + .setWithPreCommitTopology(SinkV2ITCase::flipValue) .build()); env.execute(); - assertThat( - COMMIT_QUEUE.stream() - .map(Committer.CommitRequest::getCommittable) - .collect(Collectors.toList()), - containsInAnyOrder( + assertThat(committed.get()) + .extracting(Committer.CommitRequest::getCommittable) + .containsExactlyInAnyOrderElementsOf( EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.stream() - .map(s -> s + "Transformed") - .toArray())); + .map(SinkV2ITCase::flipValue) + .collect(Collectors.toList())); + } + + private static Record<Integer> flipValue(Record<Integer> r) { + return r.withValue(-r.getValue()); } @Test public void writerAndCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); + SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> committed = + SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>()); - env.fromData(SOURCE_DATA) + final DataGeneratorSource<Integer> source = + new DataGeneratorSource<>( + l -> SOURCE_DATA.get(l.intValue()), + SOURCE_DATA.size(), + IntegerTypeInfo.INT_TYPE_INFO); + + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") .sinkTo( TestSinkV2.<Integer>newBuilder() - .setDefaultCommitter( - (Supplier<Queue<Committer.CommitRequest<String>>> - & Serializable) - () -> COMMIT_QUEUE) + .setCommitter( + new TrackingCommitter(committed), RecordSerializer::new) .build()); env.execute(); - assertThat( - COMMIT_QUEUE.stream() - .map(Committer.CommitRequest::getCommittable) - .collect(Collectors.toList()), - containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray())); + assertThat(committed.get()) + .extracting(Committer.CommitRequest::getCommittable) + .containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE); } @Test public void writerAndPrecommitToplogyAndCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); + SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> committed = + SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>()); + + final DataGeneratorSource<Integer> source = + new DataGeneratorSource<>( + l -> SOURCE_DATA.get(l.intValue()), + SOURCE_DATA.size(), + IntegerTypeInfo.INT_TYPE_INFO); - env.fromData(SOURCE_DATA) + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") .sinkTo( TestSinkV2.<Integer>newBuilder() - .setDefaultCommitter( - (Supplier<Queue<Committer.CommitRequest<String>>> - & Serializable) - () -> COMMIT_QUEUE) - .setWithPreCommitTopology(true) + .setCommitter( + new TrackingCommitter(committed), RecordSerializer::new) + .setWithPreCommitTopology(SinkV2ITCase::flipValue) .build()); env.execute(); - assertThat( - COMMIT_QUEUE.stream() - .map(Committer.CommitRequest::getCommittable) - .collect(Collectors.toList()), - containsInAnyOrder( + assertThat(committed.get()) + .extracting(Committer.CommitRequest::getCommittable) + .containsExactlyInAnyOrderElementsOf( EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.stream() - .map(s -> s + "Transformed") - .toArray())); + .map(SinkV2ITCase::flipValue) + .collect(Collectors.toList())); } private StreamExecutionEnvironment buildStreamEnv() { @@ -185,4 +189,60 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 { env.setRuntimeMode(RuntimeExecutionMode.BATCH); return env; } + + /** + * A stream source that: 1) emits a list of elements without allowing checkpoints, 2) then waits + * for two more checkpoints to complete, 3) then re-emits the same elements before 4) waiting + * for another two checkpoints and 5) exiting. + */ + private Source<Integer, ?, ?> createStreamingSource() { + RateLimiterStrategy rateLimiterStrategy = + parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 2); + return new DataGeneratorSource<>( + l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()), + SOURCE_DATA.size() * 2L, + rateLimiterStrategy, + IntegerTypeInfo.INT_TYPE_INFO); + } + + private static class BurstingRateLimiter implements RateLimiter { + private final RateLimiter rateLimiter; + private final int numCheckpointCooldown; + private int cooldown; + + public BurstingRateLimiter(int recordPerCycle, int numCheckpointCooldown) { + rateLimiter = new GatedRateLimiter(recordPerCycle); + this.numCheckpointCooldown = numCheckpointCooldown; + } + + @Override + public CompletionStage<Void> acquire() { + CompletionStage<Void> stage = rateLimiter.acquire(); + cooldown = numCheckpointCooldown; + return stage; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + if (cooldown-- <= 0) { + rateLimiter.notifyCheckpointComplete(checkpointId); + } + } + } + + private static class TrackingCommitter implements Committer<Record<Integer>>, Serializable { + private final SharedReference<Queue<CommitRequest<Record<Integer>>>> committed; + + public TrackingCommitter(SharedReference<Queue<CommitRequest<Record<Integer>>>> committed) { + this.committed = committed; + } + + @Override + public void commit(Collection<CommitRequest<Record<Integer>>> committables) { + committed.get().addAll(committables); + } + + @Override + public void close() {} + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java index a7f9340df08..09388809377 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.testutils.junit.SharedObjects; import org.apache.flink.testutils.junit.SharedReference; @@ -138,12 +139,15 @@ public class SinkV2MetricsITCase extends TestLogger { sharedObjects.add(new CountDownLatch(numCommittables)); SharedReference<CountDownLatch> afterLatch = sharedObjects.add(new CountDownLatch(1)); + TestSinkV2<Long> sink = + TestSinkV2.<Long>newBuilder() + .setCommitter( + new MetricCommitter(beforeLatch, afterLatch), + TestSinkV2.RecordSerializer::new) + .build(); env.fromSequence(0, numCommittables - 1) .returns(BasicTypeInfo.LONG_TYPE_INFO) - .sinkTo( - (TestSinkV2.<Long>newBuilder() - .setCommitter(new MetricCommitter(beforeLatch, afterLatch))) - .build()) + .sinkTo(sink) .name(TEST_SINK_NAME); JobClient jobClient = env.executeAsync(); final JobID jobId = jobClient.getJobID(); @@ -295,7 +299,7 @@ public class SinkV2MetricsITCase extends TestLogger { } } - private static class MetricCommitter extends TestSinkV2.DefaultCommitter { + private static class MetricCommitter extends TestSinkV2.DefaultCommitter<Record<Long>> { private int counter = 0; private SharedReference<CountDownLatch> beforeLatch; private SharedReference<CountDownLatch> afterLatch; @@ -309,7 +313,7 @@ public class SinkV2MetricsITCase extends TestLogger { } @Override - public void commit(Collection<CommitRequest<String>> committables) { + public void commit(Collection<CommitRequest<Record<Long>>> committables) { if (counter == 0) { System.err.println( "Committables arrived " @@ -333,26 +337,26 @@ public class SinkV2MetricsITCase extends TestLogger { committables.forEach( c -> { - switch (c.getCommittable().charAt(1)) { - case '0': + switch (c.getCommittable().getValue().intValue()) { + case 0: c.signalAlreadyCommitted(); // 1 already committed break; - case '1': - case '2': + case 1: + case 2: // 2 failed c.signalFailedWithKnownReason(new RuntimeException()); break; - case '3': + case 3: // Retry without change if (counter == 1) { c.retryLater(); } break; - case '4': - case '5': + case 4: + case 5: // Retry with change - c.updateAndRetryLater("Retry-" + c.getCommittable()); + c.updateAndRetryLater(c.getCommittable().withValue(6L)); } }); }
