This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e34696eff15ce07739183920fa0eb23e29b65000
Author: Arvid Heise <[email protected]>
AuthorDate: Sun Apr 13 09:38:22 2025 +0200

    [FLINK-37605][runtime] Remove obsolete sink tests
    
    With the removal of SinkV1, all adapter tests have also been testing V2. We 
can remove the adapter tests and simplify test hierarchy.
    
    (cherry picked from commit fe04a6e9d0f8be33cb9801b4e021c0cd7d0a5b29)
---
 .../operators/sink/CommitterOperatorTestBase.java  | 359 -------------
 .../sink/SinkV2CommitterOperatorTest.java          | 335 ++++++++++++-
 .../sink/SinkV2SinkWriterOperatorTest.java         | 509 ++++++++++++++++++-
 .../operators/sink/SinkWriterOperatorTestBase.java | 558 ---------------------
 .../sink/WithAdapterCommitterOperatorTest.java     |  70 ---
 .../sink/WithAdapterSinkWriterOperatorTest.java    | 135 -----
 6 files changed, 828 insertions(+), 1138 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
deleted file mode 100644
index 5fdb36a3953..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink;
-
-import org.apache.flink.api.connector.sink2.SupportsCommitter;
-import org.apache.flink.configuration.SinkOptions;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
-import org.assertj.core.api.AbstractThrowableAssert;
-import org.assertj.core.api.ListAssert;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.function.IntSupplier;
-
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
-import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
-import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
-import static org.assertj.core.api.Assertions.as;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-
-abstract class CommitterOperatorTestBase {
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testEmitCommittables(boolean withPostCommitTopology) throws Exception 
{
-        SinkAndCounters sinkAndCounters;
-        if (withPostCommitTopology) {
-            // Insert global committer to simulate post commit topology
-            sinkAndCounters = sinkWithPostCommit();
-        } else {
-            sinkAndCounters = sinkWithoutPostCommit();
-        }
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new 
CommitterOperatorFactory<>(sinkAndCounters.sink, false, true));
-        testHarness.open();
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 1, 1L, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableWithLineage<String> committableWithLineage =
-                new CommittableWithLineage<>("1", 1L, 1);
-        testHarness.processElement(new StreamRecord<>(committableWithLineage));
-
-        // Trigger commit
-        testHarness.notifyOfCompletedCheckpoint(1);
-
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
-        if (withPostCommitTopology) {
-            ListAssert<CommittableMessage<String>> records =
-                    assertThat(testHarness.extractOutputValues()).hasSize(2);
-            records.element(0, as(committableSummary()))
-                    
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
-                    
.hasOverallCommittables(committableSummary.getNumberOfCommittables());
-            records.element(1, as(committableWithLineage()))
-                    .isEqualTo(committableWithLineage.withSubtaskId(0));
-        } else {
-            assertThat(testHarness.getOutput()).isEmpty();
-        }
-        testHarness.close();
-    }
-
-    @Test
-    void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true);
-        testHarness.open();
-        testHarness.setProcessingTime(0);
-
-        // Only send first committable
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 1, 1L, 2, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", 1L, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-
-        assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1))
-                .hasMessageContaining("Trying to commit incomplete batch of 
committables");
-
-        assertThat(testHarness.getOutput()).isEmpty();
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero();
-
-        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("2", 1L, 1);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException();
-
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(3);
-        records.element(0, as(committableSummary()))
-                
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
-                
.hasOverallCommittables(committableSummary.getNumberOfCommittables());
-        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
-        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
-        testHarness.close();
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws 
Exception {
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, 
isBatchMode, !isBatchMode);
-        testHarness.open();
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 2, EOI, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(2, 2, EOI, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-
-        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", EOI, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("1", EOI, 2);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        testHarness.endInput();
-        if (!isBatchMode) {
-            assertThat(testHarness.getOutput()).isEmpty();
-            // notify final checkpoint complete
-            testHarness.notifyOfCompletedCheckpoint(1);
-        }
-
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(3);
-        records.element(0, as(committableSummary()))
-                .hasFailedCommittables(0)
-                .hasOverallCommittables(2);
-        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
-        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
-        testHarness.close();
-    }
-
-    @Test
-    void testStateRestore() throws Exception {
-
-        final int originalSubtaskId = 0;
-        final int subtaskIdAfterRecovery = 9;
-
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness =
-                        createTestHarness(
-                                sinkWithPostCommitWithRetry().sink,
-                                false,
-                                true,
-                                1,
-                                1,
-                                originalSubtaskId);
-        testHarness.open();
-
-        // We cannot test a different checkpoint thant 0 because when using 
the OperatorTestHarness
-        // for recovery the lastCompleted checkpoint is always reset to 0.
-        long checkpointId = 0L;
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 
1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableWithLineage<String> first =
-                new CommittableWithLineage<>("1", checkpointId, 
originalSubtaskId);
-        testHarness.processElement(new StreamRecord<>(first));
-
-        // another committable for the same checkpointId but from different 
subtask.
-        final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(originalSubtaskId + 1, 1, 
checkpointId, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-        final CommittableWithLineage<String> second =
-                new CommittableWithLineage<>("2", checkpointId, 
originalSubtaskId + 1);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        final OperatorSubtaskState snapshot = 
testHarness.snapshot(checkpointId, 2L);
-        assertThat(testHarness.getOutput()).isEmpty();
-        testHarness.close();
-
-        // create new testHarness but with different parallelism level and 
subtaskId that original
-        // one.
-        // we will make sure that new subtaskId was used during committable 
recovery.
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                restored =
-                        createTestHarness(
-                                sinkAndCounters.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
-
-        restored.initializeState(snapshot);
-        restored.open();
-
-        // Previous committables are immediately committed if possible
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(restored.extractOutputValues()).hasSize(3);
-        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
-                records.element(0, as(committableSummary()))
-                        .hasCheckpointId(checkpointId)
-                        .hasFailedCommittables(0)
-                        .hasSubtaskId(subtaskIdAfterRecovery);
-        objectCommittableSummaryAssert.hasOverallCommittables(2);
-
-        // Expect the same checkpointId that the original snapshot was made 
with.
-        records.element(1, as(committableWithLineage()))
-                .hasCheckpointId(checkpointId)
-                .hasSubtaskId(subtaskIdAfterRecovery)
-                .hasCommittable(first.getCommittable());
-        records.element(2, as(committableWithLineage()))
-                .hasCheckpointId(checkpointId)
-                .hasSubtaskId(subtaskIdAfterRecovery)
-                .hasCommittable(second.getCommittable());
-        restored.close();
-    }
-
-    @ParameterizedTest
-    @ValueSource(ints = {0, 1})
-    void testNumberOfRetries(int numRetries) throws Exception {
-        try (OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness =
-                        createTestHarness(
-                                sinkWithPostCommitWithRetry().sink, false, 
true, 1, 1, 0)) {
-            testHarness
-                    .getStreamConfig()
-                    .getConfiguration()
-                    .set(SinkOptions.COMMITTER_RETRIES, numRetries);
-            testHarness.open();
-
-            long ckdId = 1L;
-            testHarness.processElement(
-                    new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 
1, 0)));
-            testHarness.processElement(
-                    new StreamRecord<>(new CommittableWithLineage<>("1", 
ckdId, 0)));
-            AbstractThrowableAssert<?, ? extends Throwable> throwableAssert =
-                    assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(ckdId));
-            if (numRetries == 0) {
-                throwableAssert.hasMessageContaining("Failed to commit 1 
committables");
-            } else {
-                throwableAssert.doesNotThrowAnyException();
-            }
-        }
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
-        final SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-
-        try (OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new CommitterOperatorFactory<>(
-                                        sinkAndCounters.sink, false, 
isCheckpointingEnabled))) {
-            testHarness.open();
-
-            final CommittableSummary<String> committableSummary =
-                    new CommittableSummary<>(1, 1, 1L, 1, 0);
-            testHarness.processElement(new StreamRecord<>(committableSummary));
-            final CommittableWithLineage<String> committableWithLineage =
-                    new CommittableWithLineage<>("1", 1L, 1);
-            testHarness.processElement(new 
StreamRecord<>(committableWithLineage));
-
-            testHarness.endInput();
-
-            // If checkpointing enabled endInput does not emit anything 
because a final checkpoint
-            // follows
-            if (isCheckpointingEnabled) {
-                testHarness.notifyOfCompletedCheckpoint(1);
-            }
-
-            ListAssert<CommittableMessage<String>> records =
-                    assertThat(testHarness.extractOutputValues()).hasSize(2);
-            CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
-                    records.element(0, 
as(committableSummary())).hasCheckpointId(1L);
-            objectCommittableSummaryAssert.hasOverallCommittables(1);
-            records.element(1, as(committableWithLineage()))
-                    .isEqualTo(committableWithLineage.withSubtaskId(0));
-
-            // Future emission calls should change the output
-            testHarness.notifyOfCompletedCheckpoint(2);
-            testHarness.endInput();
-
-            assertThat(testHarness.getOutput()).hasSize(2);
-        }
-    }
-
-    private OneInputStreamOperatorTestHarness<
-                    CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(
-                    SupportsCommitter<String> sink,
-                    boolean isBatchMode,
-                    boolean isCheckpointingEnabled)
-                    throws Exception {
-        return new OneInputStreamOperatorTestHarness<>(
-                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled));
-    }
-
-    private OneInputStreamOperatorTestHarness<
-                    CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(
-                    SupportsCommitter<String> sink,
-                    boolean isBatchMode,
-                    boolean isCheckpointingEnabled,
-                    int maxParallelism,
-                    int parallelism,
-                    int subtaskId)
-                    throws Exception {
-        return new OneInputStreamOperatorTestHarness<>(
-                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled),
-                maxParallelism,
-                parallelism,
-                subtaskId);
-    }
-
-    abstract SinkAndCounters sinkWithPostCommit();
-
-    abstract SinkAndCounters sinkWithPostCommitWithRetry();
-
-    abstract SinkAndCounters sinkWithoutPostCommit();
-
-    static class SinkAndCounters {
-        SupportsCommitter<String> sink;
-        IntSupplier commitCounter;
-
-        public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier 
commitCounter) {
-            this.sink = sink;
-            this.commitCounter = commitCounter;
-        }
-    }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index 0112b5cf862..4c7291dd44c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -19,11 +19,32 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.configuration.SinkOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.ListAssert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collection;
+import java.util.function.IntSupplier;
+
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
+import static org.assertj.core.api.Assertions.as;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 
-class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase {
-    @Override
+class SinkV2CommitterOperatorTest {
     SinkAndCounters sinkWithPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
@@ -35,9 +56,8 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 () -> committer.successfulCommits);
     }
 
-    @Override
     SinkAndCounters sinkWithPostCommitWithRetry() {
-        return new CommitterOperatorTestBase.SinkAndCounters(
+        return new SinkAndCounters(
                 (SupportsCommitter<String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
@@ -46,7 +66,6 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 () -> 0);
     }
 
-    @Override
     SinkAndCounters sinkWithoutPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
@@ -58,6 +77,302 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 () -> committer.successfulCommits);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testEmitCommittables(boolean withPostCommitTopology) throws Exception 
{
+        SinkAndCounters sinkAndCounters;
+        if (withPostCommitTopology) {
+            // Insert global committer to simulate post commit topology
+            sinkAndCounters = sinkWithPostCommit();
+        } else {
+            sinkAndCounters = sinkWithoutPostCommit();
+        }
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
CommitterOperatorFactory<>(sinkAndCounters.sink, false, true));
+        testHarness.open();
+
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(1, 1, 1L, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableWithLineage<String> committableWithLineage =
+                new CommittableWithLineage<>("1", 1L, 1);
+        testHarness.processElement(new StreamRecord<>(committableWithLineage));
+
+        // Trigger commit
+        testHarness.notifyOfCompletedCheckpoint(1);
+
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
+        if (withPostCommitTopology) {
+            ListAssert<CommittableMessage<String>> records =
+                    assertThat(testHarness.extractOutputValues()).hasSize(2);
+            records.element(0, as(committableSummary()))
+                    
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
+                    
.hasOverallCommittables(committableSummary.getNumberOfCommittables());
+            records.element(1, as(committableWithLineage()))
+                    .isEqualTo(committableWithLineage.withSubtaskId(0));
+        } else {
+            assertThat(testHarness.getOutput()).isEmpty();
+        }
+        testHarness.close();
+    }
+
+    @Test
+    void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true);
+        testHarness.open();
+        testHarness.setProcessingTime(0);
+
+        // Only send first committable
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(1, 1, 1L, 2, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", 1L, 1);
+        testHarness.processElement(new StreamRecord<>(first));
+
+        assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1))
+                .hasMessageContaining("Trying to commit incomplete batch of 
committables");
+
+        assertThat(testHarness.getOutput()).isEmpty();
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero();
+
+        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("2", 1L, 1);
+        testHarness.processElement(new StreamRecord<>(second));
+
+        assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException();
+
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
+        ListAssert<CommittableMessage<String>> records =
+                assertThat(testHarness.extractOutputValues()).hasSize(3);
+        records.element(0, as(committableSummary()))
+                
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
+                
.hasOverallCommittables(committableSummary.getNumberOfCommittables());
+        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
+        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
+        testHarness.close();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws 
Exception {
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness = createTestHarness(sinkAndCounters.sink, 
isBatchMode, !isBatchMode);
+        testHarness.open();
+
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(1, 2, EOI, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableSummary<String> committableSummary2 =
+                new CommittableSummary<>(2, 2, EOI, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary2));
+
+        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", EOI, 1);
+        testHarness.processElement(new StreamRecord<>(first));
+        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("1", EOI, 2);
+        testHarness.processElement(new StreamRecord<>(second));
+
+        testHarness.endInput();
+        if (!isBatchMode) {
+            assertThat(testHarness.getOutput()).isEmpty();
+            // notify final checkpoint complete
+            testHarness.notifyOfCompletedCheckpoint(1);
+        }
+
+        ListAssert<CommittableMessage<String>> records =
+                assertThat(testHarness.extractOutputValues()).hasSize(3);
+        records.element(0, as(committableSummary()))
+                .hasFailedCommittables(0)
+                .hasOverallCommittables(2);
+        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
+        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
+        testHarness.close();
+    }
+
+    @Test
+    void testStateRestore() throws Exception {
+
+        final int originalSubtaskId = 0;
+        final int subtaskIdAfterRecovery = 9;
+
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        createTestHarness(
+                                sinkWithPostCommitWithRetry().sink,
+                                false,
+                                true,
+                                1,
+                                1,
+                                originalSubtaskId);
+        testHarness.open();
+
+        // We cannot test a different checkpoint thant 0 because when using 
the OperatorTestHarness
+        // for recovery the lastCompleted checkpoint is always reset to 0.
+        long checkpointId = 0L;
+
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 
1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableWithLineage<String> first =
+                new CommittableWithLineage<>("1", checkpointId, 
originalSubtaskId);
+        testHarness.processElement(new StreamRecord<>(first));
+
+        // another committable for the same checkpointId but from different 
subtask.
+        final CommittableSummary<String> committableSummary2 =
+                new CommittableSummary<>(originalSubtaskId + 1, 1, 
checkpointId, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary2));
+        final CommittableWithLineage<String> second =
+                new CommittableWithLineage<>("2", checkpointId, 
originalSubtaskId + 1);
+        testHarness.processElement(new StreamRecord<>(second));
+
+        final OperatorSubtaskState snapshot = 
testHarness.snapshot(checkpointId, 2L);
+        assertThat(testHarness.getOutput()).isEmpty();
+        testHarness.close();
+
+        // create new testHarness but with different parallelism level and 
subtaskId that original
+        // one.
+        // we will make sure that new subtaskId was used during committable 
recovery.
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                restored =
+                        createTestHarness(
+                                sinkAndCounters.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
+
+        restored.initializeState(snapshot);
+        restored.open();
+
+        // Previous committables are immediately committed if possible
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
+        ListAssert<CommittableMessage<String>> records =
+                assertThat(restored.extractOutputValues()).hasSize(3);
+        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
+                records.element(0, as(committableSummary()))
+                        .hasCheckpointId(checkpointId)
+                        .hasFailedCommittables(0)
+                        .hasSubtaskId(subtaskIdAfterRecovery);
+        objectCommittableSummaryAssert.hasOverallCommittables(2);
+
+        // Expect the same checkpointId that the original snapshot was made 
with.
+        records.element(1, as(committableWithLineage()))
+                .hasCheckpointId(checkpointId)
+                .hasSubtaskId(subtaskIdAfterRecovery)
+                .hasCommittable(first.getCommittable());
+        records.element(2, as(committableWithLineage()))
+                .hasCheckpointId(checkpointId)
+                .hasSubtaskId(subtaskIdAfterRecovery)
+                .hasCommittable(second.getCommittable());
+        restored.close();
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1})
+    void testNumberOfRetries(int numRetries) throws Exception {
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        createTestHarness(
+                                sinkWithPostCommitWithRetry().sink, false, 
true, 1, 1, 0)) {
+            testHarness
+                    .getStreamConfig()
+                    .getConfiguration()
+                    .set(SinkOptions.COMMITTER_RETRIES, numRetries);
+            testHarness.open();
+
+            long ckdId = 1L;
+            testHarness.processElement(
+                    new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 
1, 0)));
+            testHarness.processElement(
+                    new StreamRecord<>(new CommittableWithLineage<>("1", 
ckdId, 0)));
+            AbstractThrowableAssert<?, ? extends Throwable> throwableAssert =
+                    assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(ckdId));
+            if (numRetries == 0) {
+                throwableAssert.hasMessageContaining("Failed to commit 1 
committables");
+            } else {
+                throwableAssert.doesNotThrowAnyException();
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
+        final SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new CommitterOperatorFactory<>(
+                                        sinkAndCounters.sink, false, 
isCheckpointingEnabled))) {
+            testHarness.open();
+
+            final CommittableSummary<String> committableSummary =
+                    new CommittableSummary<>(1, 1, 1L, 1, 0);
+            testHarness.processElement(new StreamRecord<>(committableSummary));
+            final CommittableWithLineage<String> committableWithLineage =
+                    new CommittableWithLineage<>("1", 1L, 1);
+            testHarness.processElement(new 
StreamRecord<>(committableWithLineage));
+
+            testHarness.endInput();
+
+            // If checkpointing enabled endInput does not emit anything 
because a final checkpoint
+            // follows
+            if (isCheckpointingEnabled) {
+                testHarness.notifyOfCompletedCheckpoint(1);
+            }
+
+            ListAssert<CommittableMessage<String>> records =
+                    assertThat(testHarness.extractOutputValues()).hasSize(2);
+            CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
+                    records.element(0, 
as(committableSummary())).hasCheckpointId(1L);
+            objectCommittableSummaryAssert.hasOverallCommittables(1);
+            records.element(1, as(committableWithLineage()))
+                    .isEqualTo(committableWithLineage.withSubtaskId(0));
+
+            // Future emission calls should change the output
+            testHarness.notifyOfCompletedCheckpoint(2);
+            testHarness.endInput();
+
+            assertThat(testHarness.getOutput()).hasSize(2);
+        }
+    }
+
+    private OneInputStreamOperatorTestHarness<
+                    CommittableMessage<String>, CommittableMessage<String>>
+            createTestHarness(
+                    SupportsCommitter<String> sink,
+                    boolean isBatchMode,
+                    boolean isCheckpointingEnabled)
+                    throws Exception {
+        return new OneInputStreamOperatorTestHarness<>(
+                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled));
+    }
+
+    private OneInputStreamOperatorTestHarness<
+                    CommittableMessage<String>, CommittableMessage<String>>
+            createTestHarness(
+                    SupportsCommitter<String> sink,
+                    boolean isBatchMode,
+                    boolean isCheckpointingEnabled,
+                    int maxParallelism,
+                    int parallelism,
+                    int subtaskId)
+                    throws Exception {
+        return new OneInputStreamOperatorTestHarness<>(
+                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled),
+                maxParallelism,
+                parallelism,
+                subtaskId);
+    }
+
     private static class ForwardingCommitter extends 
TestSinkV2.DefaultCommitter {
         private int successfulCommits = 0;
 
@@ -69,4 +384,14 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
         @Override
         public void close() throws Exception {}
     }
+
+    static class SinkAndCounters {
+        SupportsCommitter<String> sink;
+        IntSupplier commitCounter;
+
+        public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier 
commitCounter) {
+            this.sink = sink;
+            this.commitCounter = commitCounter;
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
index 46974679e83..6f2c7f48de5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
@@ -18,26 +18,119 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
 
+import org.assertj.core.api.ListAssert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID;
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
+import static org.assertj.core.api.Assertions.as;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SinkV2SinkWriterOperatorTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testLoadPreviousSinkState(boolean stateful) throws Exception {
+        // 1. Build previous sink state
+        final List<String> previousSinkInputs =
+                Arrays.asList(
+                        "bit", "mention", "thick", "stick", "stir", "easy", 
"sleep", "forth",
+                        "cost", "prompt");
+
+        InspectableSink sink = sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
+        int expectedState = 5;
+        final OneInputStreamOperatorTestHarness<String, String> previousSink =
+                new OneInputStreamOperatorTestHarness<>(
+                        new CompatibleStateSinkOperator<>(
+                                TestSinkV2.WRITER_SERIALIZER, expectedState),
+                        StringSerializer.INSTANCE);
+
+        OperatorSubtaskState previousSinkState =
+                TestHarnessUtil.buildSubtaskState(previousSink, 
previousSinkInputs);
+
+        // 2. Load previous sink state and verify state
+        Sink<Integer> sink3 = sink.getSink();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+                compatibleWriterOperator =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new SinkWriterOperatorFactory<>(sink3));
+
+        // load the state from previous sink
+        compatibleWriterOperator.initializeState(previousSinkState);
+        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
+
+        // 3. do another snapshot and check if this also can be restored 
without compabitible state
+        // name
+        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
+        OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 
1L);
+
+        compatibleWriterOperator.close();
+
+        // 4. Restore the sink without previous sink's state
+        InspectableSink sink2 =
+                sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+                restoredSinkOperator =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
SinkWriterOperatorFactory<>(sink2.getSink()));
+
+        restoredSinkOperator.initializeState(snapshot);
+        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
+
+        restoredSinkOperator.close();
+    }
 
-class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase {
-    @Override
     InspectableSink sinkWithoutCommitter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
         return new 
InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build());
     }
 
-    @Override
     InspectableSink sinkWithCommitter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
                 new TestSinkV2.DefaultCommittingSinkWriter<>();
@@ -48,7 +141,6 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
                         .build());
     }
 
-    @Override
     InspectableSink sinkWithTimeBasedWriter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
         return new InspectableSink(
@@ -58,7 +150,6 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
                         .build());
     }
 
-    @Override
     InspectableSink sinkWithState(boolean withState, String stateName) {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
                 new TestSinkV2.DefaultStatefulSinkWriter<>();
@@ -76,6 +167,308 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
         return new InspectableSink(builder.build());
     }
 
+    @Test
+    void testNotEmitCommittablesWithoutCommitter() throws Exception {
+        InspectableSink sink = sinkWithoutCommitter();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
+        testHarness.open();
+        testHarness.processElement(1, 1);
+
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+        assertThat(sink.getRecordsOfCurrentCheckpoint())
+                .containsOnly("(1,1," + Long.MIN_VALUE + ")");
+
+        testHarness.prepareSnapshotPreBarrier(1);
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+        // Elements are flushed
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
+        testHarness.close();
+    }
+
+    @Test
+    void testWatermarkPropagatedToSinkWriter() throws Exception {
+        final long initialTime = 0;
+
+        InspectableSink sink = sinkWithoutCommitter();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
+        testHarness.open();
+
+        testHarness.processWatermark(initialTime);
+        testHarness.processWatermark(initialTime + 1);
+
+        assertThat(testHarness.getOutput())
+                .containsExactly(
+                        new 
org.apache.flink.streaming.api.watermark.Watermark(initialTime),
+                        new 
org.apache.flink.streaming.api.watermark.Watermark(initialTime + 1));
+        assertThat(sink.getWatermarks())
+                .containsExactly(new Watermark(initialTime), new 
Watermark(initialTime + 1));
+        testHarness.close();
+    }
+
+    @Test
+    void testTimeBasedBufferingSinkWriter() throws Exception {
+        final long initialTime = 0;
+
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new 
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink()));
+
+        testHarness.open();
+
+        testHarness.setProcessingTime(0L);
+
+        testHarness.processElement(1, initialTime + 1);
+        testHarness.processElement(2, initialTime + 2);
+
+        testHarness.prepareSnapshotPreBarrier(1L);
+
+        // Expect empty committableSummary
+        assertBasicOutput(testHarness.extractOutputValues(), 0, 1L);
+
+        testHarness.getProcessingTimeService().setCurrentTime(2001);
+
+        testHarness.prepareSnapshotPreBarrier(2L);
+
+        assertBasicOutput(
+                
testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()),
+                2,
+                2L);
+        testHarness.close();
+    }
+
+    @Test
+    void testEmitOnFlushWithCommitter() throws Exception {
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new 
SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()));
+
+        testHarness.open();
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+
+        testHarness.processElement(1, 1);
+        testHarness.processElement(2, 2);
+
+        // flush
+        testHarness.prepareSnapshotPreBarrier(1);
+
+        assertBasicOutput(testHarness.extractOutputValues(), 2, 1L);
+        testHarness.close();
+    }
+
+    @Test
+    void testEmitOnEndOfInputInBatchMode() throws Exception {
+        final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
+                new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink());
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
+
+        testHarness.open();
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+
+        testHarness.processElement(1, 1);
+        testHarness.endInput();
+        assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testStateRestore(boolean stateful) throws Exception {
+
+        final long initialTime = 0;
+
+        final InspectableSink sink = sinkWithState(stateful, null);
+        Sink<Integer> sink2 = sink.getSink();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink2));
+
+        testHarness.open();
+
+        testHarness.processWatermark(initialTime);
+        testHarness.processElement(1, initialTime + 1);
+        testHarness.processElement(2, initialTime + 2);
+
+        testHarness.prepareSnapshotPreBarrier(1L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
+
+        assertThat(sink.getRecordCountFromState()).isEqualTo(2);
+        assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L);
+
+        testHarness.close();
+
+        final InspectableSink restoredSink = sinkWithState(stateful, null);
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+                restoredTestHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
SinkWriterOperatorFactory<>(restoredSink.getSink()));
+
+        restoredTestHarness.initializeState(snapshot);
+        restoredTestHarness.open();
+
+        // check that the previous state is correctly restored
+        assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful 
? 2 : 0);
+
+        restoredTestHarness.close();
+    }
+
+    @Test
+    void testRestoreCommitterState() throws Exception {
+        final List<String> committables = Arrays.asList("state1", "state2");
+
+        InspectableSink sink = sinkWithCommitter();
+        final OneInputStreamOperatorTestHarness<String, String> committer =
+                new OneInputStreamOperatorTestHarness<>(
+                        new 
TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER),
+                        StringSerializer.INSTANCE);
+
+        final OperatorSubtaskState committerState =
+                TestHarnessUtil.buildSubtaskState(committer, committables);
+
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
+
+        testHarness.initializeState(committerState);
+
+        testHarness.open();
+
+        testHarness.prepareSnapshotPreBarrier(2);
+
+        final ListAssert<CommittableMessage<Integer>> records =
+                assertThat(testHarness.extractOutputValues()).hasSize(4);
+
+        records.element(0, as(committableSummary()))
+                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
+                .hasOverallCommittables(committables.size());
+        records.<CommittableWithLineageAssert<String>>element(1, 
as(committableWithLineage()))
+                .hasCommittable(committables.get(0))
+                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
+                .hasSubtaskId(0);
+        records.<CommittableWithLineageAssert<String>>element(2, 
as(committableWithLineage()))
+                .hasCommittable(committables.get(1))
+                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
+                .hasSubtaskId(0);
+        records.element(3, 
as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
+        InspectableSink sink = sinkWithCommitter();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<String>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
+        testHarness.open();
+        testHarness.processElement(1, 1);
+
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+        final String record = "(1,1," + Long.MIN_VALUE + ")";
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record);
+
+        testHarness.endInput();
+
+        if (isCheckpointingEnabled) {
+            testHarness.prepareSnapshotPreBarrier(1);
+        }
+
+        List<String> committables = Collections.singletonList(record);
+
+        ListAssert<CommittableMessage<String>> records =
+                
assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1);
+        records.element(0, 
as(committableSummary())).hasOverallCommittables(committables.size());
+
+        records.filteredOn(message -> message instanceof 
CommittableWithLineage)
+                .map(message -> ((CommittableWithLineage<String>) 
message).getCommittable())
+                .containsExactlyInAnyOrderElementsOf(committables);
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
+
+        testHarness.close();
+    }
+
+    @Test
+    void testInitContext() throws Exception {
+        final AtomicReference<WriterInitContext> initContext = new 
AtomicReference<>();
+        final Sink<String> sink =
+                context -> {
+                    initContext.set(context);
+                    return null;
+                };
+
+        final int subtaskId = 1;
+        final int parallelism = 10;
+        final TypeSerializer<String> typeSerializer = 
StringSerializer.INSTANCE;
+        final JobID jobID = new JobID();
+
+        final MockEnvironment environment =
+                MockEnvironment.builder()
+                        .setSubtaskIndex(subtaskId)
+                        .setParallelism(parallelism)
+                        .setMaxParallelism(parallelism)
+                        .setJobID(jobID)
+                        .setExecutionConfig(new 
ExecutionConfig().enableObjectReuse())
+                        .build();
+
+        final OneInputStreamOperatorTestHarness<String, 
CommittableMessage<String>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink), typeSerializer, 
environment);
+        testHarness.open();
+
+        assertThat(initContext.get().getUserCodeClassLoader()).isNotNull();
+        assertThat(initContext.get().getMailboxExecutor()).isNotNull();
+        assertThat(initContext.get().getProcessingTimeService()).isNotNull();
+        
assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId);
+        
assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks())
+                .isEqualTo(parallelism);
+        
assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero();
+        assertThat(initContext.get().metricGroup()).isNotNull();
+        assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent();
+        assertThat(initContext.get().isObjectReuseEnabled()).isTrue();
+        
assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer);
+        assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID);
+
+        testHarness.close();
+    }
+
+    private static void assertContextsEqual(
+            WriterInitContext initContext, WriterInitContext original) {
+        assertThat(initContext.getUserCodeClassLoader().asClassLoader())
+                .isEqualTo(original.getUserCodeClassLoader().asClassLoader());
+        
assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor());
+        assertThat(initContext.getProcessingTimeService())
+                .isEqualTo(original.getProcessingTimeService());
+        assertThat(initContext.getTaskInfo().getIndexOfThisSubtask())
+                .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask());
+        assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks())
+                
.isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks());
+        assertThat(initContext.getTaskInfo().getAttemptNumber())
+                .isEqualTo(original.getTaskInfo().getAttemptNumber());
+        
assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup());
+        assertThat(initContext.getRestoredCheckpointId())
+                .isEqualTo(original.getRestoredCheckpointId());
+        
assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled());
+        
assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer());
+        
assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId());
+        
assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer());
+    }
+
+    private static void assertBasicOutput(
+            List<CommittableMessage<Integer>> output, int 
numberOfCommittables, long checkpointId) {
+        ListAssert<CommittableMessage<Integer>> records =
+                assertThat(output).hasSize(numberOfCommittables + 1);
+        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
+                records.element(0, as(committableSummary()))
+                        .hasOverallCommittables(numberOfCommittables);
+        records.filteredOn(r -> r instanceof CommittableWithLineage)
+                .allSatisfy(
+                        cl ->
+                                
SinkV2Assertions.assertThat((CommittableWithLineage<?>) cl)
+                                        .hasCheckpointId(checkpointId)
+                                        .hasSubtaskId(0));
+    }
+
     private static class TimeBasedBufferingSinkWriter
             extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
             implements ProcessingTimeService.ProcessingTimeCallback {
@@ -131,30 +524,124 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
         }
     }
 
-    static class InspectableSink extends 
AbstractInspectableSink<TestSinkV2<Integer>> {
+    static class InspectableSink {
+        private final TestSinkV2<Integer> sink;
+
         InspectableSink(TestSinkV2<Integer> sink) {
-            super(sink);
+            this.sink = sink;
+        }
+
+        public TestSinkV2<Integer> getSink() {
+            return sink;
         }
 
-        @Override
         public long getLastCheckpointId() {
             return getSink().getWriter().lastCheckpointId;
         }
 
-        @Override
         public List<String> getRecordsOfCurrentCheckpoint() {
             return getSink().getWriter().elements;
         }
 
-        @Override
         public List<Watermark> getWatermarks() {
             return getSink().getWriter().watermarks;
         }
 
-        @Override
         public int getRecordCountFromState() {
             return ((TestSinkV2.DefaultStatefulSinkWriter<?>) 
getSink().getWriter())
                     .getRecordCount();
         }
     }
+
+    private static class TestCommitterOperator extends 
AbstractStreamOperator<String>
+            implements OneInputStreamOperator<String, String> {
+
+        private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
+                new ListStateDescriptor<>(
+                        "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
+        private ListState<List<String>> committerState;
+        private final List<String> buffer = new ArrayList<>();
+        private final SimpleVersionedSerializer<String> serializer;
+
+        public TestCommitterOperator(SimpleVersionedSerializer<String> 
serializer) {
+            this.serializer = serializer;
+        }
+
+        @Override
+        public void initializeState(StateInitializationContext context) throws 
Exception {
+            super.initializeState(context);
+            committerState =
+                    new SimpleVersionedListState<>(
+                            context.getOperatorStateStore()
+                                    
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+                            new TestingCommittableSerializer(serializer));
+        }
+
+        @Override
+        public void processElement(StreamRecord<String> element) throws 
Exception {
+            buffer.add(element.getValue());
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            committerState.add(buffer);
+        }
+    }
+
+    /** Writes state to test whether the sink can read from alternative state 
names. */
+    private static class CompatibleStateSinkOperator<T> extends 
AbstractStreamOperator<String>
+            implements OneInputStreamOperator<String, String> {
+
+        static final String SINK_STATE_NAME = "compatible_sink_state";
+
+        static final ListStateDescriptor<byte[]> SINK_STATE_DESC =
+                new ListStateDescriptor<>(SINK_STATE_NAME, 
BytePrimitiveArraySerializer.INSTANCE);
+        ListState<T> sinkState;
+        private final SimpleVersionedSerializer<T> serializer;
+        private final T initialState;
+
+        public CompatibleStateSinkOperator(
+                SimpleVersionedSerializer<T> serializer, T initialState) {
+            this.serializer = serializer;
+            this.initialState = initialState;
+        }
+
+        public void initializeState(StateInitializationContext context) throws 
Exception {
+            super.initializeState(context);
+            sinkState =
+                    new SimpleVersionedListState<>(
+                            
context.getOperatorStateStore().getListState(SINK_STATE_DESC),
+                            serializer);
+            if (!context.isRestored()) {
+                sinkState.add(initialState);
+            }
+        }
+
+        @Override
+        public void processElement(StreamRecord<String> element) {
+            // do nothing
+        }
+    }
+
+    private static class TestingCommittableSerializer
+            extends SinkV1WriterCommittableSerializer<String> {
+
+        private final SimpleVersionedSerializer<String> committableSerializer;
+
+        public TestingCommittableSerializer(
+                SimpleVersionedSerializer<String> committableSerializer) {
+            super(committableSerializer);
+            this.committableSerializer = committableSerializer;
+        }
+
+        @Override
+        public byte[] serialize(List<String> obj) throws IOException {
+            final DataOutputSerializer out = new DataOutputSerializer(256);
+            out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER);
+            SimpleVersionedSerialization.writeVersionAndSerializeList(
+                    committableSerializer, obj, out);
+            return out.getCopyOfBuffer();
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
deleted file mode 100644
index 6bde86216b9..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import 
org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
-import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-
-import org.assertj.core.api.ListAssert;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID;
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
-import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
-import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
-import static org.assertj.core.api.Assertions.as;
-import static org.assertj.core.api.Assertions.assertThat;
-
-abstract class SinkWriterOperatorTestBase {
-
-    @Test
-    void testNotEmitCommittablesWithoutCommitter() throws Exception {
-        InspectableSink sink = sinkWithoutCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-        testHarness.processElement(1, 1);
-
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        assertThat(sink.getRecordsOfCurrentCheckpoint())
-                .containsOnly("(1,1," + Long.MIN_VALUE + ")");
-
-        testHarness.prepareSnapshotPreBarrier(1);
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        // Elements are flushed
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
-        testHarness.close();
-    }
-
-    @Test
-    void testWatermarkPropagatedToSinkWriter() throws Exception {
-        final long initialTime = 0;
-
-        InspectableSink sink = sinkWithoutCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-
-        testHarness.processWatermark(initialTime);
-        testHarness.processWatermark(initialTime + 1);
-
-        assertThat(testHarness.getOutput())
-                .containsExactly(new Watermark(initialTime), new 
Watermark(initialTime + 1));
-        assertThat(sink.getWatermarks())
-                .containsExactly(
-                        new 
org.apache.flink.api.common.eventtime.Watermark(initialTime),
-                        new 
org.apache.flink.api.common.eventtime.Watermark(initialTime + 1));
-        testHarness.close();
-    }
-
-    @Test
-    void testTimeBasedBufferingSinkWriter() throws Exception {
-        final long initialTime = 0;
-
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink()));
-
-        testHarness.open();
-
-        testHarness.setProcessingTime(0L);
-
-        testHarness.processElement(1, initialTime + 1);
-        testHarness.processElement(2, initialTime + 2);
-
-        testHarness.prepareSnapshotPreBarrier(1L);
-
-        // Expect empty committableSummary
-        assertBasicOutput(testHarness.extractOutputValues(), 0, 1L);
-
-        testHarness.getProcessingTimeService().setCurrentTime(2001);
-
-        testHarness.prepareSnapshotPreBarrier(2L);
-
-        assertBasicOutput(
-                
testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()),
-                2,
-                2L);
-        testHarness.close();
-    }
-
-    @Test
-    void testEmitOnFlushWithCommitter() throws Exception {
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()));
-
-        testHarness.open();
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-
-        testHarness.processElement(1, 1);
-        testHarness.processElement(2, 2);
-
-        // flush
-        testHarness.prepareSnapshotPreBarrier(1);
-
-        assertBasicOutput(testHarness.extractOutputValues(), 2, 1L);
-        testHarness.close();
-    }
-
-    @Test
-    void testEmitOnEndOfInputInBatchMode() throws Exception {
-        final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink());
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
-
-        testHarness.open();
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-
-        testHarness.processElement(1, 1);
-        testHarness.endInput();
-        assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testStateRestore(boolean stateful) throws Exception {
-
-        final long initialTime = 0;
-
-        final InspectableSink sink = sinkWithState(stateful, null);
-        Sink<Integer> sink2 = sink.getSink();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink2));
-
-        testHarness.open();
-
-        testHarness.processWatermark(initialTime);
-        testHarness.processElement(1, initialTime + 1);
-        testHarness.processElement(2, initialTime + 2);
-
-        testHarness.prepareSnapshotPreBarrier(1L);
-        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
-
-        assertThat(sink.getRecordCountFromState()).isEqualTo(2);
-        assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L);
-
-        testHarness.close();
-
-        final InspectableSink restoredSink = sinkWithState(stateful, null);
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                restoredTestHarness =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new 
SinkWriterOperatorFactory<>(restoredSink.getSink()));
-
-        restoredTestHarness.initializeState(snapshot);
-        restoredTestHarness.open();
-
-        // check that the previous state is correctly restored
-        assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful 
? 2 : 0);
-
-        restoredTestHarness.close();
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLoadPreviousSinkState(boolean stateful) throws Exception {
-        // 1. Build previous sink state
-        final List<String> previousSinkInputs =
-                Arrays.asList(
-                        "bit", "mention", "thick", "stick", "stir", "easy", 
"sleep", "forth",
-                        "cost", "prompt");
-
-        InspectableSink sink = sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
-        int expectedState = 5;
-        final OneInputStreamOperatorTestHarness<String, String> previousSink =
-                new OneInputStreamOperatorTestHarness<>(
-                        new CompatibleStateSinkOperator<>(
-                                TestSinkV2.WRITER_SERIALIZER, expectedState),
-                        StringSerializer.INSTANCE);
-
-        OperatorSubtaskState previousSinkState =
-                TestHarnessUtil.buildSubtaskState(previousSink, 
previousSinkInputs);
-
-        // 2. Load previous sink state and verify state
-        Sink<Integer> sink3 = sink.getSink();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                compatibleWriterOperator =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new SinkWriterOperatorFactory<>(sink3));
-
-        // load the state from previous sink
-        compatibleWriterOperator.initializeState(previousSinkState);
-        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
-
-        // 3. do another snapshot and check if this also can be restored 
without compabitible state
-        // name
-        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
-        OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 
1L);
-
-        compatibleWriterOperator.close();
-
-        // 4. Restore the sink without previous sink's state
-        InspectableSink sink2 =
-                sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                restoredSinkOperator =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new 
SinkWriterOperatorFactory<>(sink2.getSink()));
-
-        restoredSinkOperator.initializeState(snapshot);
-        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
-
-        restoredSinkOperator.close();
-    }
-
-    @Test
-    void testRestoreCommitterState() throws Exception {
-        final List<String> committables = Arrays.asList("state1", "state2");
-
-        InspectableSink sink = sinkWithCommitter();
-        final OneInputStreamOperatorTestHarness<String, String> committer =
-                new OneInputStreamOperatorTestHarness<>(
-                        new 
TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER),
-                        StringSerializer.INSTANCE);
-
-        final OperatorSubtaskState committerState =
-                TestHarnessUtil.buildSubtaskState(committer, committables);
-
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-
-        testHarness.initializeState(committerState);
-
-        testHarness.open();
-
-        testHarness.prepareSnapshotPreBarrier(2);
-
-        final ListAssert<CommittableMessage<Integer>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(4);
-
-        records.element(0, as(committableSummary()))
-                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
-                .hasOverallCommittables(committables.size());
-        records.<CommittableWithLineageAssert<String>>element(1, 
as(committableWithLineage()))
-                .hasCommittable(committables.get(0))
-                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
-                .hasSubtaskId(0);
-        records.<CommittableWithLineageAssert<String>>element(2, 
as(committableWithLineage()))
-                .hasCommittable(committables.get(1))
-                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
-                .hasSubtaskId(0);
-        records.element(3, 
as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
-        InspectableSink sink = sinkWithCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<String>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-        testHarness.processElement(1, 1);
-
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        final String record = "(1,1," + Long.MIN_VALUE + ")";
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record);
-
-        testHarness.endInput();
-
-        if (isCheckpointingEnabled) {
-            testHarness.prepareSnapshotPreBarrier(1);
-        }
-
-        List<String> committables = Collections.singletonList(record);
-
-        ListAssert<CommittableMessage<String>> records =
-                
assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1);
-        records.element(0, 
as(committableSummary())).hasOverallCommittables(committables.size());
-
-        records.filteredOn(message -> message instanceof 
CommittableWithLineage)
-                .map(message -> ((CommittableWithLineage<String>) 
message).getCommittable())
-                .containsExactlyInAnyOrderElementsOf(committables);
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
-
-        testHarness.close();
-    }
-
-    @Test
-    void testInitContext() throws Exception {
-        final 
AtomicReference<org.apache.flink.api.connector.sink2.WriterInitContext> 
initContext =
-                new AtomicReference<>();
-        final org.apache.flink.api.connector.sink2.Sink<String> sink =
-                context -> {
-                    initContext.set(context);
-                    return null;
-                };
-
-        final int subtaskId = 1;
-        final int parallelism = 10;
-        final TypeSerializer<String> typeSerializer = 
StringSerializer.INSTANCE;
-        final JobID jobID = new JobID();
-
-        final MockEnvironment environment =
-                MockEnvironment.builder()
-                        .setSubtaskIndex(subtaskId)
-                        .setParallelism(parallelism)
-                        .setMaxParallelism(parallelism)
-                        .setJobID(jobID)
-                        .setExecutionConfig(new 
ExecutionConfig().enableObjectReuse())
-                        .build();
-
-        final OneInputStreamOperatorTestHarness<String, 
CommittableMessage<String>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink), typeSerializer, 
environment);
-        testHarness.open();
-
-        assertThat(initContext.get().getUserCodeClassLoader()).isNotNull();
-        assertThat(initContext.get().getMailboxExecutor()).isNotNull();
-        assertThat(initContext.get().getProcessingTimeService()).isNotNull();
-        
assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId);
-        
assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks())
-                .isEqualTo(parallelism);
-        
assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero();
-        assertThat(initContext.get().metricGroup()).isNotNull();
-        assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent();
-        assertThat(initContext.get().isObjectReuseEnabled()).isTrue();
-        
assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer);
-        assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID);
-
-        testHarness.close();
-    }
-
-    private static void assertContextsEqual(
-            WriterInitContext initContext, WriterInitContext original) {
-        assertThat(initContext.getUserCodeClassLoader().asClassLoader())
-                .isEqualTo(original.getUserCodeClassLoader().asClassLoader());
-        
assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor());
-        assertThat(initContext.getProcessingTimeService())
-                .isEqualTo(original.getProcessingTimeService());
-        assertThat(initContext.getTaskInfo().getIndexOfThisSubtask())
-                .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask());
-        assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks())
-                
.isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks());
-        assertThat(initContext.getTaskInfo().getAttemptNumber())
-                .isEqualTo(original.getTaskInfo().getAttemptNumber());
-        
assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup());
-        assertThat(initContext.getRestoredCheckpointId())
-                .isEqualTo(original.getRestoredCheckpointId());
-        
assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled());
-        
assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer());
-        
assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId());
-        
assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer());
-    }
-
-    private static void assertBasicOutput(
-            List<CommittableMessage<Integer>> output, int 
numberOfCommittables, long checkpointId) {
-        ListAssert<CommittableMessage<Integer>> records =
-                assertThat(output).hasSize(numberOfCommittables + 1);
-        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
-                records.element(0, as(committableSummary()))
-                        .hasOverallCommittables(numberOfCommittables);
-        records.filteredOn(r -> r instanceof CommittableWithLineage)
-                .allSatisfy(
-                        cl ->
-                                
SinkV2Assertions.assertThat((CommittableWithLineage<?>) cl)
-                                        .hasCheckpointId(checkpointId)
-                                        .hasSubtaskId(0));
-    }
-
-    private static class TestCommitterOperator extends 
AbstractStreamOperator<String>
-            implements OneInputStreamOperator<String, String> {
-
-        private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
-                new ListStateDescriptor<>(
-                        "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
-        private ListState<List<String>> committerState;
-        private final List<String> buffer = new ArrayList<>();
-        private final SimpleVersionedSerializer<String> serializer;
-
-        public TestCommitterOperator(SimpleVersionedSerializer<String> 
serializer) {
-            this.serializer = serializer;
-        }
-
-        @Override
-        public void initializeState(StateInitializationContext context) throws 
Exception {
-            super.initializeState(context);
-            committerState =
-                    new SimpleVersionedListState<>(
-                            context.getOperatorStateStore()
-                                    
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
-                            new TestingCommittableSerializer(serializer));
-        }
-
-        @Override
-        public void processElement(StreamRecord<String> element) throws 
Exception {
-            buffer.add(element.getValue());
-        }
-
-        @Override
-        public void snapshotState(StateSnapshotContext context) throws 
Exception {
-            super.snapshotState(context);
-            committerState.add(buffer);
-        }
-    }
-
-    /** Writes state to test whether the sink can read from alternative state 
names. */
-    private static class CompatibleStateSinkOperator<T> extends 
AbstractStreamOperator<String>
-            implements OneInputStreamOperator<String, String> {
-
-        static final String SINK_STATE_NAME = "compatible_sink_state";
-
-        static final ListStateDescriptor<byte[]> SINK_STATE_DESC =
-                new ListStateDescriptor<>(SINK_STATE_NAME, 
BytePrimitiveArraySerializer.INSTANCE);
-        ListState<T> sinkState;
-        private final SimpleVersionedSerializer<T> serializer;
-        private final T initialState;
-
-        public CompatibleStateSinkOperator(
-                SimpleVersionedSerializer<T> serializer, T initialState) {
-            this.serializer = serializer;
-            this.initialState = initialState;
-        }
-
-        public void initializeState(StateInitializationContext context) throws 
Exception {
-            super.initializeState(context);
-            sinkState =
-                    new SimpleVersionedListState<>(
-                            
context.getOperatorStateStore().getListState(SINK_STATE_DESC),
-                            serializer);
-            if (!context.isRestored()) {
-                sinkState.add(initialState);
-            }
-        }
-
-        @Override
-        public void processElement(StreamRecord<String> element) {
-            // do nothing
-        }
-    }
-
-    private static class TestingCommittableSerializer
-            extends SinkV1WriterCommittableSerializer<String> {
-
-        private final SimpleVersionedSerializer<String> committableSerializer;
-
-        public TestingCommittableSerializer(
-                SimpleVersionedSerializer<String> committableSerializer) {
-            super(committableSerializer);
-            this.committableSerializer = committableSerializer;
-        }
-
-        @Override
-        public byte[] serialize(List<String> obj) throws IOException {
-            final DataOutputSerializer out = new DataOutputSerializer(256);
-            out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER);
-            SimpleVersionedSerialization.writeVersionAndSerializeList(
-                    committableSerializer, obj, out);
-            return out.getCopyOfBuffer();
-        }
-    }
-
-    abstract InspectableSink sinkWithoutCommitter();
-
-    abstract InspectableSink sinkWithTimeBasedWriter();
-
-    abstract InspectableSink sinkWithState(boolean withState, String 
stateName);
-
-    abstract InspectableSink sinkWithCommitter();
-
-    /**
-     * Basic abstraction to access the different flavors of sinks. Remove once 
the older interfaces
-     * are removed.
-     */
-    interface InspectableSink {
-        long getLastCheckpointId();
-
-        List<String> getRecordsOfCurrentCheckpoint();
-
-        List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks();
-
-        int getRecordCountFromState();
-
-        Sink<Integer> getSink();
-    }
-
-    abstract static class AbstractInspectableSink<
-                    S extends 
org.apache.flink.api.connector.sink2.Sink<Integer>>
-            implements InspectableSink {
-        private final S sink;
-
-        protected AbstractInspectableSink(S sink) {
-            this.sink = sink;
-        }
-
-        @Override
-        public S getSink() {
-            return sink;
-        }
-    }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
deleted file mode 100644
index bb7795e38d4..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink;
-
-import org.apache.flink.api.connector.sink2.SupportsCommitter;
-
-import java.util.Collection;
-
-class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase {
-
-    @Override
-    SinkAndCounters sinkWithPostCommit() {
-        ForwardingCommitter committer = new ForwardingCommitter();
-        return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        TestSinkV2.<String>newBuilder()
-                                .setCommitter(committer)
-                                .setWithPostCommitTopology(true)
-                                .build(),
-                () -> committer.successfulCommits);
-    }
-
-    @Override
-    SinkAndCounters sinkWithPostCommitWithRetry() {
-        return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        TestSinkV2.<String>newBuilder()
-                                .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
-                                .setWithPostCommitTopology(true)
-                                .build(),
-                () -> 0);
-    }
-
-    @Override
-    SinkAndCounters sinkWithoutPostCommit() {
-        ForwardingCommitter committer = new ForwardingCommitter();
-        return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        
TestSinkV2.<String>newBuilder().setCommitter(committer).build(),
-                () -> committer.successfulCommits);
-    }
-
-    private static class ForwardingCommitter extends 
TestSinkV2.DefaultCommitter {
-        private int successfulCommits = 0;
-
-        @Override
-        public void commit(Collection<CommitRequest<String>> committables) {
-            successfulCommits += committables.size();
-        }
-
-        @Override
-        public void close() throws Exception {}
-    }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
deleted file mode 100644
index b0bf67fe4d2..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink;
-
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase {
-    @Override
-    InspectableSink sinkWithoutCommitter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
-        return new 
InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build());
-    }
-
-    @Override
-    InspectableSink sinkWithCommitter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
-                new TestSinkV2.DefaultCommittingSinkWriter<>();
-        return new InspectableSink(
-                TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
-                        .setDefaultCommitter()
-                        .build());
-    }
-
-    @Override
-    InspectableSink sinkWithTimeBasedWriter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
-        return new InspectableSink(
-                TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
-                        .setDefaultCommitter()
-                        .build());
-    }
-
-    @Override
-    InspectableSink sinkWithState(boolean withState, String stateName) {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
-                new TestSinkV2.DefaultStatefulSinkWriter<>();
-        TestSinkV2.Builder<Integer> builder =
-                TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
-                        .setDefaultCommitter()
-                        .setWithPostCommitTopology(true);
-        builder.setWriterState(withState);
-        if (stateName != null) {
-            builder.setCompatibleStateNames(stateName);
-        }
-        return new InspectableSink(builder.build());
-    }
-
-    private static class TimeBasedBufferingSinkWriter
-            extends TestSinkV2.DefaultStatefulSinkWriter<Integer>
-            implements ProcessingTimeService.ProcessingTimeCallback {
-
-        private final List<String> cachedCommittables = new ArrayList<>();
-        private ProcessingTimeService processingTimeService;
-
-        @Override
-        public void write(Integer element, Context context) {
-            cachedCommittables.add(
-                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
-        }
-
-        @Override
-        public void onProcessingTime(long time) {
-            elements.addAll(cachedCommittables);
-            cachedCommittables.clear();
-            this.processingTimeService.registerTimer(time + 1000, this);
-        }
-
-        @Override
-        public void init(WriterInitContext context) {
-            this.processingTimeService = context.getProcessingTimeService();
-            this.processingTimeService.registerTimer(1000, this);
-        }
-    }
-
-    static class InspectableSink
-            extends 
AbstractInspectableSink<org.apache.flink.api.connector.sink2.Sink<Integer>> {
-        private final TestSinkV2<Integer> sink;
-
-        InspectableSink(TestSinkV2<Integer> sink) {
-            super(sink);
-            this.sink = sink;
-        }
-
-        @Override
-        public long getLastCheckpointId() {
-            return sink.getWriter().lastCheckpointId;
-        }
-
-        @Override
-        public List<String> getRecordsOfCurrentCheckpoint() {
-            return sink.getWriter().elements;
-        }
-
-        @Override
-        public List<Watermark> getWatermarks() {
-            return sink.getWriter().watermarks;
-        }
-
-        @Override
-        public int getRecordCountFromState() {
-            TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = 
sink.getWriter();
-            if (sinkWriter instanceof TestSinkV2.DefaultStatefulSinkWriter) {
-                return ((TestSinkV2.DefaultStatefulSinkWriter<Integer>) 
sinkWriter)
-                        .getRecordCount();
-            } else {
-                return 0;
-            }
-        }
-    }
-}

Reply via email to