This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f05cddbc374b30e3fae95ada5380c3f121ec26bb
Author: Arvid Heise <[email protected]>
AuthorDate: Sun Apr 13 10:10:58 2025 +0200

    [FLINK-37605][runtime] Cleanup writer test
    
    Remove factory methods and InspectableSink because we don't need the 
abstraction anymore. Make test setup and assertions more explicit by using sink 
builder directly in tests.
    
    Remove unused methods.
    
    (cherry picked from commit 98284c4d24e61d12425c8d5f5fa17cfe40e816f9)
---
 .../SinkV2TransformationTranslatorITCase.java      |  11 +-
 .../sink/SinkV2CommitterOperatorTest.java          |  65 ++-
 .../sink/SinkV2SinkWriterOperatorTest.java         | 574 ++++++++++-----------
 .../runtime/operators/sink/TestSinkV2.java         | 368 +++++++------
 .../flink/test/streaming/runtime/SinkV2ITCase.java | 218 +++++---
 .../streaming/runtime/SinkV2MetricsITCase.java     |  32 +-
 6 files changed, 679 insertions(+), 589 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
index 9e3753031c0..8912281a080 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
@@ -66,12 +66,17 @@ class SinkV2TransformationTranslatorITCase {
     }
 
     Sink<Integer> sinkWithCommitter() {
-        return TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build();
+        return TestSinkV2.<Integer>newBuilder()
+                .setCommitter(new TestSinkV2.DefaultCommitter<>(), 
TestSinkV2.RecordSerializer::new)
+                .build();
     }
 
     Sink<Integer> sinkWithCommitterAndGlobalCommitter() {
-        return TestSinkV2.<Integer>newBuilder()
-                .setDefaultCommitter()
+        return ((TestSinkV2.Builder<Integer, TestSinkV2.Record<Integer>>)
+                        TestSinkV2.<Integer>newBuilder()
+                                .setCommitter(
+                                        new TestSinkV2.DefaultCommitter<>(),
+                                        TestSinkV2.RecordSerializer::new))
                 .setWithPostCommitTopology(true)
                 .build();
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index 4c7291dd44c..950f76b75d5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.configuration.SinkOptions;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -27,6 +30,7 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.assertj.core.api.AbstractThrowableAssert;
 import org.assertj.core.api.ListAssert;
@@ -45,35 +49,39 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 
 class SinkV2CommitterOperatorTest {
+
+    public static final 
SerializableSupplier<SimpleVersionedSerializer<String>> STRING_SERIALIZER =
+            () -> new 
SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE);
+
     SinkAndCounters sinkWithPostCommit() {
-        ForwardingCommitter committer = new ForwardingCommitter();
+        ForwardingCommitter<String> committer = new ForwardingCommitter<>();
         return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        TestSinkV2.newBuilder()
-                                .setCommitter(committer)
-                                .setWithPostCommitTopology(true)
-                                .build(),
+                TestSinkV2.<Integer>newBuilder()
+                        .setWriter(new 
TestSinkV2.ForwardCommittingSinkWriter<String>())
+                        .setCommitter(committer, STRING_SERIALIZER)
+                        .setWithPostCommitTopology(true)
+                        .build(),
                 () -> committer.successfulCommits);
     }
 
     SinkAndCounters sinkWithPostCommitWithRetry() {
         return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        TestSinkV2.newBuilder()
-                                .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
-                                .setWithPostCommitTopology(true)
-                                .build(),
+                TestSinkV2.newBuilder()
+                        .setWriter(new 
TestSinkV2.ForwardCommittingSinkWriter<String>())
+                        .setCommitter(new TestSinkV2.RetryOnceCommitter<>(), 
STRING_SERIALIZER)
+                        .setWithPostCommitTopology(true)
+                        .build(),
                 () -> 0);
     }
 
     SinkAndCounters sinkWithoutPostCommit() {
-        ForwardingCommitter committer = new ForwardingCommitter();
+        ForwardingCommitter<String> committer = new ForwardingCommitter<>();
         return new SinkAndCounters(
-                TestSinkV2.newBuilder()
-                        .setCommitter(committer)
+                TestSinkV2.<Integer>newBuilder()
+                        .setWriter(new 
TestSinkV2.ForwardCommittingSinkWriter<String>())
+                        .setCommitter(committer, STRING_SERIALIZER)
                         .setWithPostCommitTopology(false)
-                        .build()
-                        .asSupportsCommitter(),
+                        .build(),
                 () -> committer.successfulCommits);
     }
 
@@ -239,20 +247,20 @@ class SinkV2CommitterOperatorTest {
         // create new testHarness but with different parallelism level and 
subtaskId that original
         // one.
         // we will make sure that new subtaskId was used during committable 
recovery.
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+        SinkAndCounters restored = sinkWithPostCommit();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                restored =
+                restoredHarness =
                         createTestHarness(
-                                sinkAndCounters.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
+                                restored.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
 
-        restored.initializeState(snapshot);
-        restored.open();
+        restoredHarness.initializeState(snapshot);
+        restoredHarness.open();
 
         // Previous committables are immediately committed if possible
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
+        assertThat(restored.commitCounter.getAsInt()).isEqualTo(2);
         ListAssert<CommittableMessage<String>> records =
-                assertThat(restored.extractOutputValues()).hasSize(3);
+                assertThat(restoredHarness.extractOutputValues()).hasSize(3);
         CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
                 records.element(0, as(committableSummary()))
                         .hasCheckpointId(checkpointId)
@@ -269,7 +277,7 @@ class SinkV2CommitterOperatorTest {
                 .hasCheckpointId(checkpointId)
                 .hasSubtaskId(subtaskIdAfterRecovery)
                 .hasCommittable(second.getCommittable());
-        restored.close();
+        restoredHarness.close();
     }
 
     @ParameterizedTest
@@ -373,11 +381,11 @@ class SinkV2CommitterOperatorTest {
                 subtaskId);
     }
 
-    private static class ForwardingCommitter extends 
TestSinkV2.DefaultCommitter {
+    private static class ForwardingCommitter<CommT> extends 
TestSinkV2.DefaultCommitter<CommT> {
         private int successfulCommits = 0;
 
         @Override
-        public void commit(Collection<CommitRequest<String>> committables) {
+        public void commit(Collection<CommitRequest<CommT>> committables) {
             successfulCommits += committables.size();
         }
 
@@ -389,8 +397,9 @@ class SinkV2CommitterOperatorTest {
         SupportsCommitter<String> sink;
         IntSupplier commitCounter;
 
-        public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier 
commitCounter) {
-            this.sink = sink;
+        @SuppressWarnings("unchecked")
+        public SinkAndCounters(TestSinkV2<?> sink, IntSupplier commitCounter) {
+            this.sink = (SupportsCommitter<String>) sink;
             this.commitCounter = commitCounter;
         }
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
index 6f2c7f48de5..55fe2cbfae9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.api.common.typeutils.base.StringSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
@@ -45,13 +44,17 @@ import 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommitter;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommittingSinkWriter;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultSinkWriter;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultStatefulSinkWriter;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer;
 import 
org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 
-import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
-
 import org.assertj.core.api.ListAssert;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -60,7 +63,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
@@ -83,195 +85,187 @@ class SinkV2SinkWriterOperatorTest {
                         "bit", "mention", "thick", "stick", "stir", "easy", 
"sleep", "forth",
                         "cost", "prompt");
 
-        InspectableSink sink = sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
+        DefaultStatefulSinkWriter<Integer> writer = new 
DefaultStatefulSinkWriter<>();
+        TestSinkV2<Integer> sink =
+                TestSinkV2.newBuilder()
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .setWithPostCommitTopology(true)
+                        .setWriter(writer)
+                        .setWriterState(stateful)
+                        
.setCompatibleStateNames(CompatibleStateSinkOperator.SINK_STATE_NAME)
+                        .build();
         int expectedState = 5;
-        final OneInputStreamOperatorTestHarness<String, String> previousSink =
+        OperatorSubtaskState previousSinkState;
+        try (OneInputStreamOperatorTestHarness<String, String> previousSink =
                 new OneInputStreamOperatorTestHarness<>(
                         new CompatibleStateSinkOperator<>(
                                 TestSinkV2.WRITER_SERIALIZER, expectedState),
-                        StringSerializer.INSTANCE);
+                        StringSerializer.INSTANCE)) {
 
-        OperatorSubtaskState previousSinkState =
-                TestHarnessUtil.buildSubtaskState(previousSink, 
previousSinkInputs);
+            previousSinkState = 
TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs);
+        }
 
         // 2. Load previous sink state and verify state
-        Sink<Integer> sink3 = sink.getSink();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+        OperatorSubtaskState snapshot;
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
                 compatibleWriterOperator =
                         new OneInputStreamOperatorTestHarness<>(
-                                new SinkWriterOperatorFactory<>(sink3));
-
-        // load the state from previous sink
-        compatibleWriterOperator.initializeState(previousSinkState);
-        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
+                                new SinkWriterOperatorFactory<>(sink))) {
 
-        // 3. do another snapshot and check if this also can be restored 
without compabitible state
-        // name
-        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
-        OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 
1L);
+            // load the state from previous sink
+            compatibleWriterOperator.initializeState(previousSinkState);
+            assertThat(writer.getRecordCount()).isEqualTo(stateful ? 
expectedState : 0);
 
-        compatibleWriterOperator.close();
+            // 3. do another snapshot and check if this also can be restored 
without compabitible
+            // state
+            // name
+            compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
+            snapshot = compatibleWriterOperator.snapshot(1L, 1L);
+        }
 
         // 4. Restore the sink without previous sink's state
-        InspectableSink sink2 =
-                sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+        DefaultStatefulSinkWriter<Integer> restoredWriter = new 
DefaultStatefulSinkWriter<>();
+        TestSinkV2<Integer> restoredSink =
+                TestSinkV2.newBuilder()
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .setWithPostCommitTopology(true)
+                        .setWriter(restoredWriter)
+                        .setWriterState(stateful)
+                        
.setCompatibleStateNames(CompatibleStateSinkOperator.SINK_STATE_NAME)
+                        .build();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
                 restoredSinkOperator =
                         new OneInputStreamOperatorTestHarness<>(
-                                new 
SinkWriterOperatorFactory<>(sink2.getSink()));
-
-        restoredSinkOperator.initializeState(snapshot);
-        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
-
-        restoredSinkOperator.close();
-    }
+                                new 
SinkWriterOperatorFactory<>(restoredSink))) {
 
-    InspectableSink sinkWithoutCommitter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
-        return new 
InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build());
-    }
-
-    InspectableSink sinkWithCommitter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
-                new TestSinkV2.DefaultCommittingSinkWriter<>();
-        return new InspectableSink(
-                TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
-                        .setDefaultCommitter()
-                        .build());
-    }
-
-    InspectableSink sinkWithTimeBasedWriter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
-        return new InspectableSink(
-                TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
-                        .setDefaultCommitter()
-                        .build());
-    }
-
-    InspectableSink sinkWithState(boolean withState, String stateName) {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
-                new TestSinkV2.DefaultStatefulSinkWriter<>();
-        TestSinkV2.Builder<Integer> builder =
-                TestSinkV2.<Integer>newBuilder()
-                        .setDefaultCommitter()
-                        .setWithPostCommitTopology(true)
-                        .setWriter(sinkWriter);
-        if (withState) {
-            builder.setWriterState(true);
-        }
-        if (stateName != null) {
-            builder.setCompatibleStateNames(stateName);
+            restoredSinkOperator.initializeState(snapshot);
+            assertThat(restoredWriter.getRecordCount()).isEqualTo(stateful ? 
expectedState : 0);
         }
-        return new InspectableSink(builder.build());
     }
 
     @Test
     void testNotEmitCommittablesWithoutCommitter() throws Exception {
-        InspectableSink sink = sinkWithoutCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-        testHarness.processElement(1, 1);
-
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        assertThat(sink.getRecordsOfCurrentCheckpoint())
-                .containsOnly("(1,1," + Long.MIN_VALUE + ")");
-
-        testHarness.prepareSnapshotPreBarrier(1);
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        // Elements are flushed
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
-        testHarness.close();
+        DefaultSinkWriter<Integer> writer = new DefaultSinkWriter<>();
+        TestSinkV2<Integer> sink = 
TestSinkV2.newBuilder().setWriter(writer).build();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink))) {
+            testHarness.open();
+            testHarness.processElement(1, 1);
+
+            assertThat(testHarness.extractOutputValues()).isEmpty();
+            assertThat(writer.getRecordsOfCurrentCheckpoint())
+                    .containsOnly(new Record<>(1, 1L, Long.MIN_VALUE));
+
+            testHarness.prepareSnapshotPreBarrier(1);
+            assertThat(testHarness.extractOutputValues()).isEmpty();
+            // Elements are flushed
+            assertThat(writer.getRecordsOfCurrentCheckpoint()).isEmpty();
+        }
     }
 
     @Test
     void testWatermarkPropagatedToSinkWriter() throws Exception {
         final long initialTime = 0;
 
-        InspectableSink sink = sinkWithoutCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-
-        testHarness.processWatermark(initialTime);
-        testHarness.processWatermark(initialTime + 1);
-
-        assertThat(testHarness.getOutput())
-                .containsExactly(
-                        new 
org.apache.flink.streaming.api.watermark.Watermark(initialTime),
-                        new 
org.apache.flink.streaming.api.watermark.Watermark(initialTime + 1));
-        assertThat(sink.getWatermarks())
-                .containsExactly(new Watermark(initialTime), new 
Watermark(initialTime + 1));
-        testHarness.close();
+        DefaultSinkWriter<Integer> writer = new DefaultSinkWriter<>();
+        TestSinkV2<Integer> sink = 
TestSinkV2.newBuilder().setWriter(writer).build();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink))) {
+            testHarness.open();
+
+            testHarness.processWatermark(initialTime);
+            testHarness.processWatermark(initialTime + 1);
+
+            assertThat(testHarness.getOutput())
+                    .containsExactly(
+                            new 
org.apache.flink.streaming.api.watermark.Watermark(initialTime),
+                            new 
org.apache.flink.streaming.api.watermark.Watermark(
+                                    initialTime + 1));
+            assertThat(writer.getWatermarks())
+                    .containsExactly(new Watermark(initialTime), new 
Watermark(initialTime + 1));
+        }
     }
 
     @Test
     void testTimeBasedBufferingSinkWriter() throws Exception {
         final long initialTime = 0;
 
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink()));
+        DefaultSinkWriter<Integer> writer = new TimeBasedBufferingSinkWriter();
+        TestSinkV2<Integer> sink =
+                TestSinkV2.newBuilder()
+                        .setWriter(writer)
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .build();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink))) {
 
-        testHarness.open();
+            testHarness.open();
 
-        testHarness.setProcessingTime(0L);
+            testHarness.setProcessingTime(0L);
 
-        testHarness.processElement(1, initialTime + 1);
-        testHarness.processElement(2, initialTime + 2);
+            testHarness.processElement(1, initialTime + 1);
+            testHarness.processElement(2, initialTime + 2);
 
-        testHarness.prepareSnapshotPreBarrier(1L);
+            testHarness.prepareSnapshotPreBarrier(1L);
 
-        // Expect empty committableSummary
-        assertBasicOutput(testHarness.extractOutputValues(), 0, 1L);
+            // Expect empty committableSummary
+            assertBasicOutput(testHarness.extractOutputValues(), 0, 1L);
 
-        testHarness.getProcessingTimeService().setCurrentTime(2001);
+            testHarness.getProcessingTimeService().setCurrentTime(2001);
 
-        testHarness.prepareSnapshotPreBarrier(2L);
+            testHarness.prepareSnapshotPreBarrier(2L);
 
-        assertBasicOutput(
-                
testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()),
-                2,
-                2L);
-        testHarness.close();
+            assertBasicOutput(
+                    
testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()),
+                    2,
+                    2L);
+        }
     }
 
     @Test
     void testEmitOnFlushWithCommitter() throws Exception {
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()));
+        DefaultSinkWriter<Integer> writer = new 
DefaultCommittingSinkWriter<>();
+        TestSinkV2<Integer> sink =
+                TestSinkV2.newBuilder()
+                        .setWriter(writer)
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .build();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink))) {
 
-        testHarness.open();
-        assertThat(testHarness.extractOutputValues()).isEmpty();
+            testHarness.open();
+            assertThat(testHarness.extractOutputValues()).isEmpty();
 
-        testHarness.processElement(1, 1);
-        testHarness.processElement(2, 2);
+            testHarness.processElement(1, 1);
+            testHarness.processElement(2, 2);
 
-        // flush
-        testHarness.prepareSnapshotPreBarrier(1);
+            // flush
+            testHarness.prepareSnapshotPreBarrier(1);
 
-        assertBasicOutput(testHarness.extractOutputValues(), 2, 1L);
-        testHarness.close();
+            assertBasicOutput(testHarness.extractOutputValues(), 2, 1L);
+        }
     }
 
     @Test
     void testEmitOnEndOfInputInBatchMode() throws Exception {
+        DefaultSinkWriter<Integer> writer = new 
DefaultCommittingSinkWriter<>();
+        TestSinkV2<Integer> sink =
+                TestSinkV2.newBuilder()
+                        .setWriter(writer)
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .build();
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink());
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
+                new SinkWriterOperatorFactory<>(sink);
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new 
OneInputStreamOperatorTestHarness<>(writerOperatorFactory)) {
 
-        testHarness.open();
-        assertThat(testHarness.extractOutputValues()).isEmpty();
+            testHarness.open();
+            assertThat(testHarness.extractOutputValues()).isEmpty();
 
-        testHarness.processElement(1, 1);
-        testHarness.endInput();
-        assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
+            testHarness.processElement(1, 1);
+            testHarness.endInput();
+            assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
+        }
     }
 
     @ParameterizedTest
@@ -280,74 +274,94 @@ class SinkV2SinkWriterOperatorTest {
 
         final long initialTime = 0;
 
-        final InspectableSink sink = sinkWithState(stateful, null);
-        Sink<Integer> sink2 = sink.getSink();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink2));
-
-        testHarness.open();
+        DefaultStatefulSinkWriter<Integer> writer = new 
DefaultStatefulSinkWriter<>();
+        final TestSinkV2<Integer> sink =
+                TestSinkV2.newBuilder()
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .setWithPostCommitTopology(true)
+                        .setWriter(writer)
+                        .setWriterState(stateful)
+                        .build();
+        OperatorSubtaskState snapshot;
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink))) {
 
-        testHarness.processWatermark(initialTime);
-        testHarness.processElement(1, initialTime + 1);
-        testHarness.processElement(2, initialTime + 2);
+            testHarness.open();
 
-        testHarness.prepareSnapshotPreBarrier(1L);
-        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
+            testHarness.processWatermark(initialTime);
+            testHarness.processElement(1, initialTime + 1);
+            testHarness.processElement(2, initialTime + 2);
 
-        assertThat(sink.getRecordCountFromState()).isEqualTo(2);
-        assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L);
+            testHarness.prepareSnapshotPreBarrier(1L);
+            snapshot = testHarness.snapshot(1L, 1L);
 
-        testHarness.close();
+            assertThat(writer.getRecordCount()).isEqualTo(2);
+            assertThat(writer.getLastCheckpointId()).isEqualTo(stateful ? 1L : 
-1L);
+        }
 
-        final InspectableSink restoredSink = sinkWithState(stateful, null);
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+        DefaultStatefulSinkWriter<Integer> restoredWriter = new 
DefaultStatefulSinkWriter<>();
+        final TestSinkV2<Integer> restoredSink =
+                TestSinkV2.newBuilder()
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .setWithPostCommitTopology(true)
+                        .setWriter(restoredWriter)
+                        .setWriterState(stateful)
+                        .build();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
                 restoredTestHarness =
                         new OneInputStreamOperatorTestHarness<>(
-                                new 
SinkWriterOperatorFactory<>(restoredSink.getSink()));
-
-        restoredTestHarness.initializeState(snapshot);
-        restoredTestHarness.open();
+                                new 
SinkWriterOperatorFactory<>(restoredSink))) {
 
-        // check that the previous state is correctly restored
-        assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful 
? 2 : 0);
+            restoredTestHarness.initializeState(snapshot);
+            restoredTestHarness.open();
 
-        restoredTestHarness.close();
+            // check that the previous state is correctly restored
+            assertThat(restoredWriter.getRecordCount()).isEqualTo(stateful ? 2 
: 0);
+        }
     }
 
     @Test
     void testRestoreCommitterState() throws Exception {
-        final List<String> committables = Arrays.asList("state1", "state2");
-
-        InspectableSink sink = sinkWithCommitter();
-        final OneInputStreamOperatorTestHarness<String, String> committer =
+        final List<Record<Integer>> committables =
+                Arrays.asList(new Record<>(1, 1L, 1), new Record<>(2, 2L, 2));
+
+        DefaultSinkWriter<Integer> writer = new 
DefaultCommittingSinkWriter<>();
+        TestSinkV2<Integer> sink =
+                TestSinkV2.newBuilder()
+                        .setWriter(writer)
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .build();
+        final OperatorSubtaskState committerState;
+        try (OneInputStreamOperatorTestHarness<Record<Integer>, 
Record<Integer>> committer =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER),
-                        StringSerializer.INSTANCE);
+                        new TestCommitterOperator(new RecordSerializer<>()))) {
 
-        final OperatorSubtaskState committerState =
-                TestHarnessUtil.buildSubtaskState(committer, committables);
+            committerState = TestHarnessUtil.buildSubtaskState(committer, 
committables);
+        }
 
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
+        final ListAssert<CommittableMessage<Integer>> records;
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink))) {
 
-        testHarness.initializeState(committerState);
+            testHarness.initializeState(committerState);
 
-        testHarness.open();
+            testHarness.open();
 
-        testHarness.prepareSnapshotPreBarrier(2);
+            testHarness.prepareSnapshotPreBarrier(2);
 
-        final ListAssert<CommittableMessage<Integer>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(4);
+            records = assertThat(testHarness.extractOutputValues()).hasSize(4);
+        }
 
         records.element(0, as(committableSummary()))
                 .hasCheckpointId(INITIAL_CHECKPOINT_ID)
                 .hasOverallCommittables(committables.size());
-        records.<CommittableWithLineageAssert<String>>element(1, 
as(committableWithLineage()))
+        records.<CommittableWithLineageAssert<Record<Integer>>>element(
+                        1, as(committableWithLineage()))
                 .hasCommittable(committables.get(0))
                 .hasCheckpointId(INITIAL_CHECKPOINT_ID)
                 .hasSubtaskId(0);
-        records.<CommittableWithLineageAssert<String>>element(2, 
as(committableWithLineage()))
+        records.<CommittableWithLineageAssert<Record<Integer>>>element(
+                        2, as(committableWithLineage()))
                 .hasCommittable(committables.get(1))
                 .hasCheckpointId(INITIAL_CHECKPOINT_ID)
                 .hasSubtaskId(0);
@@ -357,35 +371,44 @@ class SinkV2SinkWriterOperatorTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
-        InspectableSink sink = sinkWithCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<String>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-        testHarness.processElement(1, 1);
-
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        final String record = "(1,1," + Long.MIN_VALUE + ")";
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record);
+        DefaultSinkWriter<Integer> writer = new 
DefaultCommittingSinkWriter<>();
+        TestSinkV2<Integer> sink =
+                TestSinkV2.newBuilder()
+                        .setWriter(writer)
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .build();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Record<Integer>>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new SinkWriterOperatorFactory<>(sink))) {
+            testHarness.open();
+            testHarness.processElement(1, 1);
 
-        testHarness.endInput();
+            assertThat(testHarness.extractOutputValues()).isEmpty();
+            Record<Integer> record = new Record<>(1, 1L, Long.MIN_VALUE);
+            
assertThat(writer.getRecordsOfCurrentCheckpoint()).containsOnly(record);
 
-        if (isCheckpointingEnabled) {
-            testHarness.prepareSnapshotPreBarrier(1);
-        }
+            testHarness.endInput();
 
-        List<String> committables = Collections.singletonList(record);
+            if (isCheckpointingEnabled) {
+                testHarness.prepareSnapshotPreBarrier(1);
+            }
 
-        ListAssert<CommittableMessage<String>> records =
-                
assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1);
-        records.element(0, 
as(committableSummary())).hasOverallCommittables(committables.size());
+            List<Record<Integer>> committables = 
Collections.singletonList(record);
 
-        records.filteredOn(message -> message instanceof 
CommittableWithLineage)
-                .map(message -> ((CommittableWithLineage<String>) 
message).getCommittable())
-                .containsExactlyInAnyOrderElementsOf(committables);
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
+            ListAssert<CommittableMessage<Record<Integer>>> records =
+                    
assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1);
+            records.element(0, as(committableSummary()))
+                    .hasOverallCommittables(committables.size());
 
-        testHarness.close();
+            records.filteredOn(message -> message instanceof 
CommittableWithLineage)
+                    .map(
+                            message ->
+                                    ((CommittableWithLineage<Record<Integer>>) 
message)
+                                            .getCommittable())
+                    .containsExactlyInAnyOrderElementsOf(committables);
+            assertThat(writer.getRecordsOfCurrentCheckpoint()).isEmpty();
+        }
     }
 
     @Test
@@ -411,47 +434,28 @@ class SinkV2SinkWriterOperatorTest {
                         .setExecutionConfig(new 
ExecutionConfig().enableObjectReuse())
                         .build();
 
-        final OneInputStreamOperatorTestHarness<String, 
CommittableMessage<String>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink), typeSerializer, 
environment);
-        testHarness.open();
-
-        assertThat(initContext.get().getUserCodeClassLoader()).isNotNull();
-        assertThat(initContext.get().getMailboxExecutor()).isNotNull();
-        assertThat(initContext.get().getProcessingTimeService()).isNotNull();
-        
assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId);
-        
assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks())
-                .isEqualTo(parallelism);
-        
assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero();
-        assertThat(initContext.get().metricGroup()).isNotNull();
-        assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent();
-        assertThat(initContext.get().isObjectReuseEnabled()).isTrue();
-        
assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer);
-        assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID);
-
-        testHarness.close();
-    }
-
-    private static void assertContextsEqual(
-            WriterInitContext initContext, WriterInitContext original) {
-        assertThat(initContext.getUserCodeClassLoader().asClassLoader())
-                .isEqualTo(original.getUserCodeClassLoader().asClassLoader());
-        
assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor());
-        assertThat(initContext.getProcessingTimeService())
-                .isEqualTo(original.getProcessingTimeService());
-        assertThat(initContext.getTaskInfo().getIndexOfThisSubtask())
-                .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask());
-        assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks())
-                
.isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks());
-        assertThat(initContext.getTaskInfo().getAttemptNumber())
-                .isEqualTo(original.getTaskInfo().getAttemptNumber());
-        
assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup());
-        assertThat(initContext.getRestoredCheckpointId())
-                .isEqualTo(original.getRestoredCheckpointId());
-        
assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled());
-        
assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer());
-        
assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId());
-        
assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer());
+        try (OneInputStreamOperatorTestHarness<String, 
CommittableMessage<Record<Integer>>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new SinkWriterOperatorFactory<>(sink),
+                                typeSerializer,
+                                environment)) {
+            testHarness.open();
+
+            assertThat(initContext.get().getUserCodeClassLoader()).isNotNull();
+            assertThat(initContext.get().getMailboxExecutor()).isNotNull();
+            
assertThat(initContext.get().getProcessingTimeService()).isNotNull();
+            assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask())
+                    .isEqualTo(subtaskId);
+            
assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks())
+                    .isEqualTo(parallelism);
+            
assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero();
+            assertThat(initContext.get().metricGroup()).isNotNull();
+            
assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent();
+            assertThat(initContext.get().isObjectReuseEnabled()).isTrue();
+            
assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer);
+            
assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID);
+        }
     }
 
     private static void assertBasicOutput(
@@ -469,17 +473,16 @@ class SinkV2SinkWriterOperatorTest {
                                         .hasSubtaskId(0));
     }
 
-    private static class TimeBasedBufferingSinkWriter
-            extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
+    private static class TimeBasedBufferingSinkWriter extends 
DefaultCommittingSinkWriter<Integer>
             implements ProcessingTimeService.ProcessingTimeCallback {
 
-        private final List<String> cachedCommittables = new ArrayList<>();
+        private final List<Record<Integer>> cachedCommittables = new 
ArrayList<>();
         private ProcessingTimeService processingTimeService;
 
         @Override
         public void write(Integer element, Context context) {
             cachedCommittables.add(
-                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
+                    new Record<>(element, context.timestamp(), 
context.currentWatermark()));
         }
 
         @Override
@@ -496,74 +499,17 @@ class SinkV2SinkWriterOperatorTest {
         }
     }
 
-    private static class SnapshottingBufferingSinkWriter
-            extends TestSinkV2.DefaultStatefulSinkWriter {
-        public static final int NOT_SNAPSHOTTED = -1;
-        long lastCheckpointId = NOT_SNAPSHOTTED;
-        boolean endOfInput = false;
-
-        @Override
-        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
-            this.endOfInput = endOfInput;
-        }
-
-        @Override
-        public List<String> snapshotState(long checkpointId) throws 
IOException {
-            lastCheckpointId = checkpointId;
-            return super.snapshotState(checkpointId);
-        }
-
-        @Override
-        public Collection<String> prepareCommit() {
-            if (!endOfInput) {
-                return ImmutableList.of();
-            }
-            List<String> result = elements;
-            elements = new ArrayList<>();
-            return result;
-        }
-    }
-
-    static class InspectableSink {
-        private final TestSinkV2<Integer> sink;
-
-        InspectableSink(TestSinkV2<Integer> sink) {
-            this.sink = sink;
-        }
-
-        public TestSinkV2<Integer> getSink() {
-            return sink;
-        }
-
-        public long getLastCheckpointId() {
-            return getSink().getWriter().lastCheckpointId;
-        }
-
-        public List<String> getRecordsOfCurrentCheckpoint() {
-            return getSink().getWriter().elements;
-        }
-
-        public List<Watermark> getWatermarks() {
-            return getSink().getWriter().watermarks;
-        }
-
-        public int getRecordCountFromState() {
-            return ((TestSinkV2.DefaultStatefulSinkWriter<?>) 
getSink().getWriter())
-                    .getRecordCount();
-        }
-    }
-
-    private static class TestCommitterOperator extends 
AbstractStreamOperator<String>
-            implements OneInputStreamOperator<String, String> {
+    private static class TestCommitterOperator extends 
AbstractStreamOperator<Record<Integer>>
+            implements OneInputStreamOperator<Record<Integer>, 
Record<Integer>> {
 
         private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
                 new ListStateDescriptor<>(
                         "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
-        private ListState<List<String>> committerState;
-        private final List<String> buffer = new ArrayList<>();
-        private final SimpleVersionedSerializer<String> serializer;
+        private ListState<List<Record<Integer>>> committerState;
+        private final List<Record<Integer>> buffer = new ArrayList<>();
+        private final SimpleVersionedSerializer<Record<Integer>> serializer;
 
-        public TestCommitterOperator(SimpleVersionedSerializer<String> 
serializer) {
+        public 
TestCommitterOperator(SimpleVersionedSerializer<Record<Integer>> serializer) {
             this.serializer = serializer;
         }
 
@@ -578,7 +524,7 @@ class SinkV2SinkWriterOperatorTest {
         }
 
         @Override
-        public void processElement(StreamRecord<String> element) throws 
Exception {
+        public void processElement(StreamRecord<Record<Integer>> element) {
             buffer.add(element.getValue());
         }
 
@@ -625,18 +571,18 @@ class SinkV2SinkWriterOperatorTest {
     }
 
     private static class TestingCommittableSerializer
-            extends SinkV1WriterCommittableSerializer<String> {
+            extends SinkV1WriterCommittableSerializer<Record<Integer>> {
 
-        private final SimpleVersionedSerializer<String> committableSerializer;
+        private final SimpleVersionedSerializer<Record<Integer>> 
committableSerializer;
 
         public TestingCommittableSerializer(
-                SimpleVersionedSerializer<String> committableSerializer) {
+                SimpleVersionedSerializer<Record<Integer>> 
committableSerializer) {
             super(committableSerializer);
             this.committableSerializer = committableSerializer;
         }
 
         @Override
-        public byte[] serialize(List<String> obj) throws IOException {
+        public byte[] serialize(List<Record<Integer>> obj) throws IOException {
             final DataOutputSerializer out = new DataOutputSerializer(256);
             out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER);
             SimpleVersionedSerialization.writeVersionAndSerializeList(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
index 18f934752a3..396af961830 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
@@ -30,7 +29,6 @@ import 
org.apache.flink.api.connector.sink2.StatefulSinkWriter;
 import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.api.connector.sink2.SupportsWriterState;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -41,12 +39,12 @@ import 
org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
 import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
 import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.SerializableFunction;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableSet;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -54,94 +52,93 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Queue;
+import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.assertj.core.api.Assertions.assertThat;
 
 /** A {@link Sink} for all the sink related tests. */
 public class TestSinkV2<InputT> implements Sink<InputT> {
-    public static final SimpleVersionedSerializerAdapter<String> 
COMMITTABLE_SERIALIZER =
-            new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE);
-    public static final SimpleVersionedSerializerAdapter<Integer> 
WRITER_SERIALIZER =
+    public static final SimpleVersionedSerializer<Integer> WRITER_SERIALIZER =
             new SimpleVersionedSerializerAdapter<>(IntSerializer.INSTANCE);
 
-    private final DefaultSinkWriter<InputT> writer;
+    private final SinkWriter<InputT> writer;
 
-    private TestSinkV2(DefaultSinkWriter<InputT> writer) {
+    private TestSinkV2(SinkWriter<InputT> writer) {
         this.writer = writer;
     }
 
     public SinkWriter<InputT> createWriter(WriterInitContext context) {
-        writer.init(context);
+        if (writer instanceof DefaultSinkWriter) {
+            ((DefaultSinkWriter<InputT>) writer).init(context);
+        }
         return writer;
     }
 
-    DefaultSinkWriter<InputT> getWriter() {
+    SinkWriter<InputT> getWriter() {
         return writer;
     }
 
-    public static <InputT> Builder<InputT> newBuilder() {
+    public static <InputT> Builder<InputT, Record<InputT>> newBuilder() {
         return new Builder<>();
     }
 
-    public static <InputT> Builder<InputT> 
newBuilder(DefaultSinkWriter<InputT> writer) {
-        return new Builder<InputT>().setWriter(writer);
-    }
-
-    public SupportsCommitter<String> asSupportsCommitter() {
-        throw new UnsupportedOperationException("No committter");
+    public static <InputT> Builder<InputT, Record<InputT>> newBuilder(
+            DefaultSinkWriter<InputT> writer) {
+        return new Builder<InputT, Record<InputT>>().setWriter(writer);
     }
 
     /** A builder class for {@link TestSinkV2}. */
-    public static class Builder<InputT> {
-        private DefaultSinkWriter<InputT> writer = null;
-        private DefaultCommitter committer;
+    public static class Builder<InputT, CommT> {
+        private SinkWriter<InputT> writer = null;
+        private Committer<CommT> committer;
         private boolean withPostCommitTopology = false;
-        private boolean withPreCommitTopology = false;
+        private SerializableFunction<CommT, CommT> preCommitTopology = null;
         private boolean withWriterState = false;
         private String compatibleStateNames;
+        private SerializableSupplier<SimpleVersionedSerializer<CommT>> 
commSerializerFactory;
 
-        public Builder<InputT> setWriter(DefaultSinkWriter<InputT> writer) {
-            this.writer = checkNotNull(writer);
-            return this;
+        @SuppressWarnings("unchecked")
+        public <NewInputT, NewCommT> Builder<NewInputT, NewCommT> setWriter(
+                CommittingSinkWriter<NewInputT, NewCommT> writer) {
+            Builder<NewInputT, NewCommT> self = (Builder<NewInputT, NewCommT>) 
this;
+            self.writer = checkNotNull(writer);
+            return self;
         }
 
-        public Builder<InputT> setCommitter(DefaultCommitter committer) {
-            this.committer = committer;
-            return this;
+        @SuppressWarnings("unchecked")
+        public <NewInputT> Builder<NewInputT, CommT> 
setWriter(SinkWriter<NewInputT> writer) {
+            Builder<NewInputT, CommT> self = (Builder<NewInputT, CommT>) this;
+            self.writer = checkNotNull(writer);
+            return self;
         }
 
-        public Builder<InputT> setDefaultCommitter() {
-            this.committer = new DefaultCommitter();
-            return this;
-        }
-
-        public Builder<InputT> setDefaultCommitter(
-                Supplier<Queue<Committer.CommitRequest<String>>> 
queueSupplier) {
-            this.committer = new DefaultCommitter(queueSupplier);
+        public Builder<InputT, CommT> setCommitter(
+                Committer<CommT> committer,
+                SerializableSupplier<SimpleVersionedSerializer<CommT>> 
commSerializerFactory) {
+            this.committer = committer;
+            this.commSerializerFactory = commSerializerFactory;
             return this;
         }
 
-        public Builder<InputT> setWithPostCommitTopology(boolean 
withPostCommitTopology) {
+        public Builder<InputT, CommT> setWithPostCommitTopology(boolean 
withPostCommitTopology) {
             this.withPostCommitTopology = withPostCommitTopology;
             return this;
         }
 
-        public Builder<InputT> setWithPreCommitTopology(boolean 
withPreCommitTopology) {
-            this.withPreCommitTopology = withPreCommitTopology;
+        public Builder<InputT, CommT> setWithPreCommitTopology(
+                SerializableFunction<CommT, CommT> preCommitTopology) {
+            this.preCommitTopology = preCommitTopology;
             return this;
         }
 
-        public Builder<InputT> setWriterState(boolean withWriterState) {
+        public Builder<InputT, CommT> setWriterState(boolean withWriterState) {
             this.withWriterState = withWriterState;
             return this;
         }
 
-        public Builder<InputT> setCompatibleStateNames(String 
compatibleStateNames) {
+        public Builder<InputT, CommT> setCompatibleStateNames(String 
compatibleStateNames) {
             this.compatibleStateNames = compatibleStateNames;
             return this;
         }
@@ -158,139 +155,130 @@ public class TestSinkV2<InputT> implements Sink<InputT> 
{
                     writer = new DefaultCommittingSinkWriter<>();
                 }
                 if (!withPostCommitTopology) {
-                    if (!withPreCommitTopology) {
+                    if (preCommitTopology == null) {
                         // TwoPhaseCommittingSink with a stateless writer and 
a committer
                         return new TestSinkV2TwoPhaseCommittingSink<>(
-                                writer, COMMITTABLE_SERIALIZER, committer);
+                                writer, commSerializerFactory, committer);
                     } else {
                         // TwoPhaseCommittingSink with a stateless writer, pre 
commit topology,
                         // committer
-                        Preconditions.checkArgument(
-                                writer instanceof DefaultCommittingSinkWriter,
-                                "Please provide a DefaultCommittingSinkWriter 
instance");
                         return new TestSinkV2WithPreCommitTopology<>(
-                                writer, COMMITTABLE_SERIALIZER, committer);
+                                writer, commSerializerFactory, committer, 
preCommitTopology);
                     }
                 } else {
                     if (withWriterState) {
                         // TwoPhaseCommittingSink with a stateful writer and a 
committer and post
                         // commit topology
-                        Preconditions.checkArgument(
-                                writer instanceof DefaultStatefulSinkWriter,
-                                "Please provide a DefaultStatefulSinkWriter 
instance");
                         return new TestStatefulSinkV2<>(
-                                (DefaultStatefulSinkWriter<InputT>) writer,
-                                COMMITTABLE_SERIALIZER,
-                                committer,
-                                compatibleStateNames);
+                                writer, commSerializerFactory, committer, 
compatibleStateNames);
                     } else {
                         // TwoPhaseCommittingSink with a stateless writer and 
a committer and post
                         // commit topology
-                        Preconditions.checkArgument(
-                                writer instanceof DefaultCommittingSinkWriter,
-                                "Please provide a DefaultCommittingSinkWriter 
instance");
                         return new TestSinkV2WithPostCommitTopology<>(
-                                writer, COMMITTABLE_SERIALIZER, committer);
+                                writer, commSerializerFactory, committer);
                     }
                 }
             }
         }
     }
 
-    private static class TestSinkV2TwoPhaseCommittingSink<InputT> extends 
TestSinkV2<InputT>
-            implements SupportsCommitter<String> {
-        private final DefaultCommitter committer;
-        private final SimpleVersionedSerializer<String> committableSerializer;
+    private static class TestSinkV2TwoPhaseCommittingSink<InputT, CommT> 
extends TestSinkV2<InputT>
+            implements SupportsCommitter<CommT> {
+        private final Committer<CommT> committer;
+        protected final SerializableSupplier<SimpleVersionedSerializer<CommT>>
+                commSerializerFactory;
 
         public TestSinkV2TwoPhaseCommittingSink(
-                DefaultSinkWriter<InputT> writer,
-                SimpleVersionedSerializer<String> committableSerializer,
-                DefaultCommitter committer) {
+                SinkWriter<InputT> writer,
+                SerializableSupplier<SimpleVersionedSerializer<CommT>> 
commSerializerFactory,
+                Committer<CommT> committer) {
             super(writer);
             this.committer = committer;
-            this.committableSerializer = committableSerializer;
+            this.commSerializerFactory = commSerializerFactory;
         }
 
         @Override
-        public Committer<String> createCommitter(CommitterInitContext context) 
{
-            committer.init();
+        public Committer<CommT> createCommitter(CommitterInitContext context) {
+            if (committer instanceof DefaultCommitter) {
+                ((DefaultCommitter<CommT>) committer).init();
+            }
             return committer;
         }
 
         @Override
-        public SupportsCommitter<String> asSupportsCommitter() {
-            return this;
-        }
-
-        @Override
-        public SimpleVersionedSerializer<String> getCommittableSerializer() {
-            return committableSerializer;
+        public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
+            return commSerializerFactory.get();
         }
     }
 
     // -------------------------------------- Sink With PostCommitTopology 
-------------------------
 
-    private static class TestSinkV2WithPostCommitTopology<InputT>
-            extends TestSinkV2TwoPhaseCommittingSink<InputT>
-            implements SupportsPostCommitTopology<String> {
+    private static class TestSinkV2WithPostCommitTopology<InputT, CommT>
+            extends TestSinkV2TwoPhaseCommittingSink<InputT, CommT>
+            implements SupportsPostCommitTopology<CommT> {
         public TestSinkV2WithPostCommitTopology(
-                DefaultSinkWriter<InputT> writer,
-                SimpleVersionedSerializer<String> committableSerializer,
-                DefaultCommitter committer) {
-            super(writer, committableSerializer, committer);
+                SinkWriter<InputT> writer,
+                SerializableSupplier<SimpleVersionedSerializer<CommT>> 
commSerializerFactory,
+                Committer<CommT> committer) {
+            super(writer, commSerializerFactory, committer);
         }
 
         @Override
-        public void 
addPostCommitTopology(DataStream<CommittableMessage<String>> committables) {
+        public void 
addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
             StandardSinkTopologies.addGlobalCommitter(
                     committables, this::createCommitter, 
this::getCommittableSerializer);
         }
     }
 
-    private static class TestSinkV2WithPreCommitTopology<InputT>
-            extends TestSinkV2TwoPhaseCommittingSink<InputT>
-            implements SupportsPreCommitTopology<String, String> {
+    private static class TestSinkV2WithPreCommitTopology<InputT, CommT>
+            extends TestSinkV2TwoPhaseCommittingSink<InputT, CommT>
+            implements SupportsPreCommitTopology<CommT, CommT> {
+        private final SerializableFunction<CommT, CommT> preCommitTopology;
+
         public TestSinkV2WithPreCommitTopology(
-                DefaultSinkWriter<InputT> writer,
-                SimpleVersionedSerializer<String> committableSerializer,
-                DefaultCommitter committer) {
-            super(writer, committableSerializer, committer);
+                SinkWriter<InputT> writer,
+                SerializableSupplier<SimpleVersionedSerializer<CommT>> 
commSerializerFactory,
+                Committer<CommT> committer,
+                SerializableFunction<CommT, CommT> preCommitTopology) {
+            super(writer, commSerializerFactory, committer);
+            this.preCommitTopology = preCommitTopology;
         }
 
         @Override
-        public DataStream<CommittableMessage<String>> addPreCommitTopology(
-                DataStream<CommittableMessage<String>> committables) {
+        public DataStream<CommittableMessage<CommT>> addPreCommitTopology(
+                DataStream<CommittableMessage<CommT>> committables) {
             return committables
                     .map(
                             m -> {
                                 if (m instanceof CommittableSummary) {
                                     return m;
                                 } else {
-                                    CommittableWithLineage<String> withLineage 
=
-                                            (CommittableWithLineage<String>) m;
-                                    return withLineage.map(old -> old + 
"Transformed");
+                                    CommittableWithLineage<CommT> withLineage =
+                                            (CommittableWithLineage<CommT>) m;
+                                    return withLineage.map(preCommitTopology);
                                 }
                             })
-                    .returns(CommittableMessageTypeInfo.of(() -> 
COMMITTABLE_SERIALIZER));
+                    
.returns(CommittableMessageTypeInfo.of(commSerializerFactory));
         }
 
         @Override
-        public SimpleVersionedSerializer<String> getWriteResultSerializer() {
-            return new 
SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE);
+        public SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
+            return commSerializerFactory.get();
         }
     }
 
-    private static class TestStatefulSinkV2<InputT> extends 
TestSinkV2WithPostCommitTopology<InputT>
+    private static class TestStatefulSinkV2<InputT, CommT>
+            extends TestSinkV2WithPostCommitTopology<InputT, CommT>
             implements SupportsWriterState<InputT, Integer>,
                     SupportsWriterState.WithCompatibleState {
         private final String compatibleState;
 
         public TestStatefulSinkV2(
-                DefaultStatefulSinkWriter<InputT> writer,
-                SimpleVersionedSerializer<String> committableSerializer,
-                DefaultCommitter committer,
+                SinkWriter<InputT> writer,
+                SerializableSupplier<SimpleVersionedSerializer<CommT>> 
commSerializerFactory,
+                Committer<CommT> committer,
                 String compatibleState) {
-            super(writer, committableSerializer, committer);
+            super(writer, commSerializerFactory, committer);
             this.compatibleState = compatibleState;
         }
 
@@ -303,7 +291,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         public StatefulSinkWriter<InputT, Integer> restoreWriter(
                 WriterInitContext context, Collection<Integer> recoveredState) 
{
             DefaultStatefulSinkWriter<InputT> statefulWriter =
-                    (DefaultStatefulSinkWriter) getWriter();
+                    (DefaultStatefulSinkWriter<InputT>) getWriter();
 
             statefulWriter.restore(recoveredState);
             return statefulWriter;
@@ -322,10 +310,90 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
 
     // -------------------------------------- Sink Writer 
------------------------------------------
 
+    public static class Record<T> implements Serializable {
+        private final T value;
+        private final Long timestamp;
+        private final long watermark;
+
+        public Record(T value, Long timestamp, long watermark) {
+            this.value = value;
+            this.timestamp = timestamp;
+            this.watermark = watermark;
+        }
+
+        public T getValue() {
+            return value;
+        }
+
+        public Long getTimestamp() {
+            return timestamp;
+        }
+
+        public long getWatermark() {
+            return watermark;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+            Record<?> record = (Record<?>) object;
+            return Objects.equals(timestamp, record.timestamp)
+                    && watermark == record.watermark
+                    && Objects.equals(value, record.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(value, timestamp, watermark);
+        }
+
+        @Override
+        public String toString() {
+            return "Record{"
+                    + "value="
+                    + value
+                    + ", timestamp="
+                    + timestamp
+                    + ", watermark="
+                    + watermark
+                    + '}';
+        }
+
+        public Record<T> withValue(T value) {
+            return new Record<>(value, timestamp, watermark);
+        }
+    }
+
+    public static class RecordSerializer<T> implements 
SimpleVersionedSerializer<Record<T>> {
+        @Override
+        public int getVersion() {
+            return 0;
+        }
+
+        @Override
+        public byte[] serialize(Record<T> record) throws IOException {
+            return InstantiationUtil.serializeObject(record);
+        }
+
+        @Override
+        public Record<T> deserialize(int version, byte[] serialized) throws 
IOException {
+            try {
+                return InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     /** Base class for out testing {@link SinkWriter}. */
     public static class DefaultSinkWriter<InputT> implements 
SinkWriter<InputT>, Serializable {
 
-        protected List<String> elements;
+        protected List<Record<InputT>> elements;
 
         protected List<Watermark> watermarks;
 
@@ -338,8 +406,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
 
         @Override
         public void write(InputT element, Context context) {
-            elements.add(
-                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
+            elements.add(new Record<>(element, context.timestamp(), 
context.currentWatermark()));
         }
 
         @Override
@@ -347,6 +414,18 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
             elements = new ArrayList<>();
         }
 
+        public List<Record<InputT>> getRecordsOfCurrentCheckpoint() {
+            return elements;
+        }
+
+        public List<Watermark> getWatermarks() {
+            return watermarks;
+        }
+
+        public long getLastCheckpointId() {
+            return lastCheckpointId;
+        }
+
         @Override
         public void writeWatermark(Watermark watermark) {
             watermarks.add(watermark);
@@ -364,7 +443,23 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
 
     /** Base class for out testing {@link CommittingSinkWriter}. */
     protected static class DefaultCommittingSinkWriter<InputT> extends 
DefaultSinkWriter<InputT>
-            implements CommittingSinkWriter<InputT, String>, Serializable {
+            implements CommittingSinkWriter<InputT, Record<InputT>>, 
Serializable {
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            // We empty the elements on prepareCommit
+        }
+
+        @Override
+        public Collection<Record<InputT>> prepareCommit() {
+            List<Record<InputT>> result = elements;
+            elements = new ArrayList<>();
+            return result;
+        }
+    }
+
+    protected static class ForwardCommittingSinkWriter<InputT> extends 
DefaultSinkWriter<InputT>
+            implements CommittingSinkWriter<InputT, InputT>, Serializable {
 
         @Override
         public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
@@ -372,8 +467,9 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
 
         @Override
-        public Collection<String> prepareCommit() {
-            List<String> result = elements;
+        public Collection<InputT> prepareCommit() {
+            List<InputT> result =
+                    
elements.stream().map(Record::getValue).collect(Collectors.toList());
             elements = new ArrayList<>();
             return result;
         }
@@ -412,42 +508,15 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     // -------------------------------------- Sink Committer 
---------------------------------------
 
     /** Base class for testing {@link Committer}. */
-    public static class DefaultCommitter implements Committer<String>, 
Serializable {
-
-        @Nullable protected Queue<CommitRequest<String>> committedData;
-
+    public static class DefaultCommitter<CommT> implements Committer<CommT>, 
Serializable {
         private boolean isClosed;
 
-        @Nullable private final Supplier<Queue<CommitRequest<String>>> 
queueSupplier;
-
         public DefaultCommitter() {
-            this.committedData = new ConcurrentLinkedQueue<>();
             this.isClosed = false;
-            this.queueSupplier = null;
-        }
-
-        public DefaultCommitter(@Nullable 
Supplier<Queue<CommitRequest<String>>> queueSupplier) {
-            this.queueSupplier = queueSupplier;
-            this.isClosed = false;
-            this.committedData = null;
-        }
-
-        public List<CommitRequest<String>> getCommittedData() {
-            if (committedData != null) {
-                return new ArrayList<>(committedData);
-            } else {
-                return Collections.emptyList();
-            }
         }
 
         @Override
-        public void commit(Collection<CommitRequest<String>> committables) {
-            if (committedData == null) {
-                assertThat(queueSupplier).isNotNull();
-                committedData = queueSupplier.get();
-            }
-            committedData.addAll(committables);
-        }
+        public void commit(Collection<CommitRequest<CommT>> committables) {}
 
         public void close() throws Exception {
             isClosed = true;
@@ -463,18 +532,15 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     }
 
     /** A {@link Committer} that always re-commits the committables data it 
received. */
-    static class RetryOnceCommitter extends DefaultCommitter {
+    static class RetryOnceCommitter<CommT> extends DefaultCommitter<CommT> {
 
-        private final Set<String> seen = new LinkedHashSet<>();
+        private final Set<CommT> seen = new LinkedHashSet<>();
 
         @Override
-        public void commit(Collection<CommitRequest<String>> committables) {
+        public void commit(Collection<CommitRequest<CommT>> committables) {
             committables.forEach(
                     c -> {
-                        if (seen.remove(c.getCommittable())) {
-                            checkNotNull(committedData);
-                            committedData.add(c);
-                        } else {
+                        if (!seen.remove(c.getCommittable())) {
                             seen.add(c.getCommittable());
                             c.retryLater();
                         }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 15e00e70b0f..88e89e3185e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -18,159 +18,163 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
-import org.apache.flink.streaming.util.FiniteTestSource;
-import org.apache.flink.test.util.AbstractTestBaseJUnit4;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Integration test for {@link org.apache.flink.api.connector.sink2.Sink} run 
time implementation.
  */
-public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
+public class SinkV2ITCase extends AbstractTestBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SinkV2ITCase.class);
+
     static final List<Integer> SOURCE_DATA =
             Arrays.asList(
                     895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 
630, 682, 765, 434, 970,
                     714, 795, 288, 422);
 
-    // source send data two times
-    static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 
2;
-
-    static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
+    static final List<Record<Integer>> 
EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
             SOURCE_DATA.stream()
                     // source send data two times
                     .flatMap(
                             x ->
-                                    Collections.nCopies(
-                                            2, Tuple3.of(x, null, 
Long.MIN_VALUE).toString())
+                                    Collections.nCopies(2, new Record<>(x, 
null, Long.MIN_VALUE))
                                             .stream())
                     .collect(Collectors.toList());
 
-    static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE =
+    static final List<Record<Integer>> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE =
             SOURCE_DATA.stream()
-                    .map(x -> Tuple3.of(x, null, Long.MIN_VALUE).toString())
+                    .map(x -> new Record<>(x, null, Long.MIN_VALUE))
                     .collect(Collectors.toList());
 
-    static final Queue<Committer.CommitRequest<String>> COMMIT_QUEUE =
-            new ConcurrentLinkedQueue<>();
-
-    static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA =
-            (BooleanSupplier & Serializable)
-                    () -> COMMIT_QUEUE.size() == 
STREAMING_SOURCE_SEND_ELEMENTS_NUM;
-
-    @Before
-    public void init() {
-        COMMIT_QUEUE.clear();
-    }
+    @RegisterExtension
+    static final SharedObjectsExtension SHARED_OBJECTS = 
SharedObjectsExtension.create();
 
     @Test
     public void writerAndCommitterExecuteInStreamingMode() throws Exception {
         final StreamExecutionEnvironment env = buildStreamEnv();
-        final FiniteTestSource<Integer> source =
-                new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, 
SOURCE_DATA);
+        SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> 
committed =
+                SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
+        final Source<Integer, ?, ?> source = createStreamingSource();
 
-        env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
-                                .setDefaultCommitter(
-                                        
(Supplier<Queue<Committer.CommitRequest<String>>>
-                                                        & Serializable)
-                                                () -> COMMIT_QUEUE)
+                                .setCommitter(
+                                        new TrackingCommitter(committed), 
RecordSerializer::new)
                                 .build());
         env.execute();
-        assertThat(
-                COMMIT_QUEUE.stream()
-                        .map(Committer.CommitRequest::getCommittable)
-                        .collect(Collectors.toList()),
-                
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
+        assertThat(committed.get())
+                .extracting(Committer.CommitRequest::getCommittable)
+                
.containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
     }
 
     @Test
     public void writerAndPrecommitToplogyAndCommitterExecuteInStreamingMode() 
throws Exception {
         final StreamExecutionEnvironment env = buildStreamEnv();
-        final FiniteTestSource<Integer> source =
-                new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, 
SOURCE_DATA);
+        SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> 
committed =
+                SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
+        final Source<Integer, ?, ?> source = createStreamingSource();
 
-        env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
-                                .setDefaultCommitter(
-                                        
(Supplier<Queue<Committer.CommitRequest<String>>>
-                                                        & Serializable)
-                                                () -> COMMIT_QUEUE)
-                                .setWithPreCommitTopology(true)
+                                .setCommitter(
+                                        new TrackingCommitter(committed), 
RecordSerializer::new)
+                                
.setWithPreCommitTopology(SinkV2ITCase::flipValue)
                                 .build());
         env.execute();
-        assertThat(
-                COMMIT_QUEUE.stream()
-                        .map(Committer.CommitRequest::getCommittable)
-                        .collect(Collectors.toList()),
-                containsInAnyOrder(
+        assertThat(committed.get())
+                .extracting(Committer.CommitRequest::getCommittable)
+                .containsExactlyInAnyOrderElementsOf(
                         EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.stream()
-                                .map(s -> s + "Transformed")
-                                .toArray()));
+                                .map(SinkV2ITCase::flipValue)
+                                .collect(Collectors.toList()));
+    }
+
+    private static Record<Integer> flipValue(Record<Integer> r) {
+        return r.withValue(-r.getValue());
     }
 
     @Test
     public void writerAndCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
+        SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> 
committed =
+                SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
 
-        env.fromData(SOURCE_DATA)
+        final DataGeneratorSource<Integer> source =
+                new DataGeneratorSource<>(
+                        l -> SOURCE_DATA.get(l.intValue()),
+                        SOURCE_DATA.size(),
+                        IntegerTypeInfo.INT_TYPE_INFO);
+
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
-                                .setDefaultCommitter(
-                                        
(Supplier<Queue<Committer.CommitRequest<String>>>
-                                                        & Serializable)
-                                                () -> COMMIT_QUEUE)
+                                .setCommitter(
+                                        new TrackingCommitter(committed), 
RecordSerializer::new)
                                 .build());
         env.execute();
-        assertThat(
-                COMMIT_QUEUE.stream()
-                        .map(Committer.CommitRequest::getCommittable)
-                        .collect(Collectors.toList()),
-                
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
+        assertThat(committed.get())
+                .extracting(Committer.CommitRequest::getCommittable)
+                
.containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);
     }
 
     @Test
     public void writerAndPrecommitToplogyAndCommitterExecuteInBatchMode() 
throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
+        SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> 
committed =
+                SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
+
+        final DataGeneratorSource<Integer> source =
+                new DataGeneratorSource<>(
+                        l -> SOURCE_DATA.get(l.intValue()),
+                        SOURCE_DATA.size(),
+                        IntegerTypeInfo.INT_TYPE_INFO);
 
-        env.fromData(SOURCE_DATA)
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
-                                .setDefaultCommitter(
-                                        
(Supplier<Queue<Committer.CommitRequest<String>>>
-                                                        & Serializable)
-                                                () -> COMMIT_QUEUE)
-                                .setWithPreCommitTopology(true)
+                                .setCommitter(
+                                        new TrackingCommitter(committed), 
RecordSerializer::new)
+                                
.setWithPreCommitTopology(SinkV2ITCase::flipValue)
                                 .build());
         env.execute();
-        assertThat(
-                COMMIT_QUEUE.stream()
-                        .map(Committer.CommitRequest::getCommittable)
-                        .collect(Collectors.toList()),
-                containsInAnyOrder(
+        assertThat(committed.get())
+                .extracting(Committer.CommitRequest::getCommittable)
+                .containsExactlyInAnyOrderElementsOf(
                         EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.stream()
-                                .map(s -> s + "Transformed")
-                                .toArray()));
+                                .map(SinkV2ITCase::flipValue)
+                                .collect(Collectors.toList()));
     }
 
     private StreamExecutionEnvironment buildStreamEnv() {
@@ -185,4 +189,60 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         return env;
     }
+
+    /**
+     * A stream source that: 1) emits a list of elements without allowing 
checkpoints, 2) then waits
+     * for two more checkpoints to complete, 3) then re-emits the same 
elements before 4) waiting
+     * for another two checkpoints and 5) exiting.
+     */
+    private Source<Integer, ?, ?> createStreamingSource() {
+        RateLimiterStrategy rateLimiterStrategy =
+                parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 
2);
+        return new DataGeneratorSource<>(
+                l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()),
+                SOURCE_DATA.size() * 2L,
+                rateLimiterStrategy,
+                IntegerTypeInfo.INT_TYPE_INFO);
+    }
+
+    private static class BurstingRateLimiter implements RateLimiter {
+        private final RateLimiter rateLimiter;
+        private final int numCheckpointCooldown;
+        private int cooldown;
+
+        public BurstingRateLimiter(int recordPerCycle, int 
numCheckpointCooldown) {
+            rateLimiter = new GatedRateLimiter(recordPerCycle);
+            this.numCheckpointCooldown = numCheckpointCooldown;
+        }
+
+        @Override
+        public CompletionStage<Void> acquire() {
+            CompletionStage<Void> stage = rateLimiter.acquire();
+            cooldown = numCheckpointCooldown;
+            return stage;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            if (cooldown-- <= 0) {
+                rateLimiter.notifyCheckpointComplete(checkpointId);
+            }
+        }
+    }
+
+    private static class TrackingCommitter implements 
Committer<Record<Integer>>, Serializable {
+        private final SharedReference<Queue<CommitRequest<Record<Integer>>>> 
committed;
+
+        public 
TrackingCommitter(SharedReference<Queue<CommitRequest<Record<Integer>>>> 
committed) {
+            this.committed = committed;
+        }
+
+        @Override
+        public void commit(Collection<CommitRequest<Record<Integer>>> 
committables) {
+            committed.get().addAll(committables);
+        }
+
+        @Override
+        public void close() {}
+    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
index a7f9340df08..09388809377 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
@@ -138,12 +139,15 @@ public class SinkV2MetricsITCase extends TestLogger {
                 sharedObjects.add(new CountDownLatch(numCommittables));
         SharedReference<CountDownLatch> afterLatch = sharedObjects.add(new 
CountDownLatch(1));
 
+        TestSinkV2<Long> sink =
+                TestSinkV2.<Long>newBuilder()
+                        .setCommitter(
+                                new MetricCommitter(beforeLatch, afterLatch),
+                                TestSinkV2.RecordSerializer::new)
+                        .build();
         env.fromSequence(0, numCommittables - 1)
                 .returns(BasicTypeInfo.LONG_TYPE_INFO)
-                .sinkTo(
-                        (TestSinkV2.<Long>newBuilder()
-                                        .setCommitter(new 
MetricCommitter(beforeLatch, afterLatch)))
-                                .build())
+                .sinkTo(sink)
                 .name(TEST_SINK_NAME);
         JobClient jobClient = env.executeAsync();
         final JobID jobId = jobClient.getJobID();
@@ -295,7 +299,7 @@ public class SinkV2MetricsITCase extends TestLogger {
         }
     }
 
-    private static class MetricCommitter extends TestSinkV2.DefaultCommitter {
+    private static class MetricCommitter extends 
TestSinkV2.DefaultCommitter<Record<Long>> {
         private int counter = 0;
         private SharedReference<CountDownLatch> beforeLatch;
         private SharedReference<CountDownLatch> afterLatch;
@@ -309,7 +313,7 @@ public class SinkV2MetricsITCase extends TestLogger {
         }
 
         @Override
-        public void commit(Collection<CommitRequest<String>> committables) {
+        public void commit(Collection<CommitRequest<Record<Long>>> 
committables) {
             if (counter == 0) {
                 System.err.println(
                         "Committables arrived "
@@ -333,26 +337,26 @@ public class SinkV2MetricsITCase extends TestLogger {
 
                 committables.forEach(
                         c -> {
-                            switch (c.getCommittable().charAt(1)) {
-                                case '0':
+                            switch (c.getCommittable().getValue().intValue()) {
+                                case 0:
                                     c.signalAlreadyCommitted();
                                     // 1 already committed
                                     break;
-                                case '1':
-                                case '2':
+                                case 1:
+                                case 2:
                                     // 2 failed
                                     c.signalFailedWithKnownReason(new 
RuntimeException());
                                     break;
-                                case '3':
+                                case 3:
                                     // Retry without change
                                     if (counter == 1) {
                                         c.retryLater();
                                     }
                                     break;
-                                case '4':
-                                case '5':
+                                case 4:
+                                case 5:
                                     // Retry with change
-                                    c.updateAndRetryLater("Retry-" + 
c.getCommittable());
+                                    
c.updateAndRetryLater(c.getCommittable().withValue(6L));
                             }
                         });
             }


Reply via email to