This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe04a6e9d0f8be33cb9801b4e021c0cd7d0a5b29
Author: Arvid Heise <[email protected]>
AuthorDate: Sun Apr 13 09:38:22 2025 +0200

    [FLINK-37605][runtime] Remove obsolete sink tests
    
    With the removal of SinkV1, all adapter tests have also been testing V2. We 
can remove the adapter tests and simplify test hierarchy.
---
 .../operators/sink/CommitterOperatorTestBase.java  | 359 -------------
 .../sink/SinkV2CommitterOperatorTest.java          | 335 ++++++++++++-
 .../sink/SinkV2SinkWriterOperatorTest.java         | 509 ++++++++++++++++++-
 .../operators/sink/SinkWriterOperatorTestBase.java | 558 ---------------------
 .../sink/WithAdapterCommitterOperatorTest.java     |  70 ---
 .../sink/WithAdapterSinkWriterOperatorTest.java    | 135 -----
 6 files changed, 828 insertions(+), 1138 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
deleted file mode 100644
index 5fdb36a3953..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink;
-
-import org.apache.flink.api.connector.sink2.SupportsCommitter;
-import org.apache.flink.configuration.SinkOptions;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
-import org.assertj.core.api.AbstractThrowableAssert;
-import org.assertj.core.api.ListAssert;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.function.IntSupplier;
-
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
-import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
-import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
-import static org.assertj.core.api.Assertions.as;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-
-abstract class CommitterOperatorTestBase {
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testEmitCommittables(boolean withPostCommitTopology) throws Exception 
{
-        SinkAndCounters sinkAndCounters;
-        if (withPostCommitTopology) {
-            // Insert global committer to simulate post commit topology
-            sinkAndCounters = sinkWithPostCommit();
-        } else {
-            sinkAndCounters = sinkWithoutPostCommit();
-        }
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new 
CommitterOperatorFactory<>(sinkAndCounters.sink, false, true));
-        testHarness.open();
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 1, 1L, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableWithLineage<String> committableWithLineage =
-                new CommittableWithLineage<>("1", 1L, 1);
-        testHarness.processElement(new StreamRecord<>(committableWithLineage));
-
-        // Trigger commit
-        testHarness.notifyOfCompletedCheckpoint(1);
-
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
-        if (withPostCommitTopology) {
-            ListAssert<CommittableMessage<String>> records =
-                    assertThat(testHarness.extractOutputValues()).hasSize(2);
-            records.element(0, as(committableSummary()))
-                    
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
-                    
.hasOverallCommittables(committableSummary.getNumberOfCommittables());
-            records.element(1, as(committableWithLineage()))
-                    .isEqualTo(committableWithLineage.withSubtaskId(0));
-        } else {
-            assertThat(testHarness.getOutput()).isEmpty();
-        }
-        testHarness.close();
-    }
-
-    @Test
-    void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true);
-        testHarness.open();
-        testHarness.setProcessingTime(0);
-
-        // Only send first committable
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 1, 1L, 2, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", 1L, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-
-        assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1))
-                .hasMessageContaining("Trying to commit incomplete batch of 
committables");
-
-        assertThat(testHarness.getOutput()).isEmpty();
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero();
-
-        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("2", 1L, 1);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException();
-
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(3);
-        records.element(0, as(committableSummary()))
-                
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
-                
.hasOverallCommittables(committableSummary.getNumberOfCommittables());
-        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
-        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
-        testHarness.close();
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws 
Exception {
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, 
isBatchMode, !isBatchMode);
-        testHarness.open();
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 2, EOI, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(2, 2, EOI, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-
-        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", EOI, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("1", EOI, 2);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        testHarness.endInput();
-        if (!isBatchMode) {
-            assertThat(testHarness.getOutput()).isEmpty();
-            // notify final checkpoint complete
-            testHarness.notifyOfCompletedCheckpoint(1);
-        }
-
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(3);
-        records.element(0, as(committableSummary()))
-                .hasFailedCommittables(0)
-                .hasOverallCommittables(2);
-        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
-        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
-        testHarness.close();
-    }
-
-    @Test
-    void testStateRestore() throws Exception {
-
-        final int originalSubtaskId = 0;
-        final int subtaskIdAfterRecovery = 9;
-
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness =
-                        createTestHarness(
-                                sinkWithPostCommitWithRetry().sink,
-                                false,
-                                true,
-                                1,
-                                1,
-                                originalSubtaskId);
-        testHarness.open();
-
-        // We cannot test a different checkpoint thant 0 because when using 
the OperatorTestHarness
-        // for recovery the lastCompleted checkpoint is always reset to 0.
-        long checkpointId = 0L;
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 
1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableWithLineage<String> first =
-                new CommittableWithLineage<>("1", checkpointId, 
originalSubtaskId);
-        testHarness.processElement(new StreamRecord<>(first));
-
-        // another committable for the same checkpointId but from different 
subtask.
-        final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(originalSubtaskId + 1, 1, 
checkpointId, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-        final CommittableWithLineage<String> second =
-                new CommittableWithLineage<>("2", checkpointId, 
originalSubtaskId + 1);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        final OperatorSubtaskState snapshot = 
testHarness.snapshot(checkpointId, 2L);
-        assertThat(testHarness.getOutput()).isEmpty();
-        testHarness.close();
-
-        // create new testHarness but with different parallelism level and 
subtaskId that original
-        // one.
-        // we will make sure that new subtaskId was used during committable 
recovery.
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                restored =
-                        createTestHarness(
-                                sinkAndCounters.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
-
-        restored.initializeState(snapshot);
-        restored.open();
-
-        // Previous committables are immediately committed if possible
-        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(restored.extractOutputValues()).hasSize(3);
-        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
-                records.element(0, as(committableSummary()))
-                        .hasCheckpointId(checkpointId)
-                        .hasFailedCommittables(0)
-                        .hasSubtaskId(subtaskIdAfterRecovery);
-        objectCommittableSummaryAssert.hasOverallCommittables(2);
-
-        // Expect the same checkpointId that the original snapshot was made 
with.
-        records.element(1, as(committableWithLineage()))
-                .hasCheckpointId(checkpointId)
-                .hasSubtaskId(subtaskIdAfterRecovery)
-                .hasCommittable(first.getCommittable());
-        records.element(2, as(committableWithLineage()))
-                .hasCheckpointId(checkpointId)
-                .hasSubtaskId(subtaskIdAfterRecovery)
-                .hasCommittable(second.getCommittable());
-        restored.close();
-    }
-
-    @ParameterizedTest
-    @ValueSource(ints = {0, 1})
-    void testNumberOfRetries(int numRetries) throws Exception {
-        try (OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness =
-                        createTestHarness(
-                                sinkWithPostCommitWithRetry().sink, false, 
true, 1, 1, 0)) {
-            testHarness
-                    .getStreamConfig()
-                    .getConfiguration()
-                    .set(SinkOptions.COMMITTER_RETRIES, numRetries);
-            testHarness.open();
-
-            long ckdId = 1L;
-            testHarness.processElement(
-                    new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 
1, 0)));
-            testHarness.processElement(
-                    new StreamRecord<>(new CommittableWithLineage<>("1", 
ckdId, 0)));
-            AbstractThrowableAssert<?, ? extends Throwable> throwableAssert =
-                    assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(ckdId));
-            if (numRetries == 0) {
-                throwableAssert.hasMessageContaining("Failed to commit 1 
committables");
-            } else {
-                throwableAssert.doesNotThrowAnyException();
-            }
-        }
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
-        final SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-
-        try (OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new CommitterOperatorFactory<>(
-                                        sinkAndCounters.sink, false, 
isCheckpointingEnabled))) {
-            testHarness.open();
-
-            final CommittableSummary<String> committableSummary =
-                    new CommittableSummary<>(1, 1, 1L, 1, 0);
-            testHarness.processElement(new StreamRecord<>(committableSummary));
-            final CommittableWithLineage<String> committableWithLineage =
-                    new CommittableWithLineage<>("1", 1L, 1);
-            testHarness.processElement(new 
StreamRecord<>(committableWithLineage));
-
-            testHarness.endInput();
-
-            // If checkpointing enabled endInput does not emit anything 
because a final checkpoint
-            // follows
-            if (isCheckpointingEnabled) {
-                testHarness.notifyOfCompletedCheckpoint(1);
-            }
-
-            ListAssert<CommittableMessage<String>> records =
-                    assertThat(testHarness.extractOutputValues()).hasSize(2);
-            CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
-                    records.element(0, 
as(committableSummary())).hasCheckpointId(1L);
-            objectCommittableSummaryAssert.hasOverallCommittables(1);
-            records.element(1, as(committableWithLineage()))
-                    .isEqualTo(committableWithLineage.withSubtaskId(0));
-
-            // Future emission calls should change the output
-            testHarness.notifyOfCompletedCheckpoint(2);
-            testHarness.endInput();
-
-            assertThat(testHarness.getOutput()).hasSize(2);
-        }
-    }
-
-    private OneInputStreamOperatorTestHarness<
-                    CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(
-                    SupportsCommitter<String> sink,
-                    boolean isBatchMode,
-                    boolean isCheckpointingEnabled)
-                    throws Exception {
-        return new OneInputStreamOperatorTestHarness<>(
-                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled));
-    }
-
-    private OneInputStreamOperatorTestHarness<
-                    CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(
-                    SupportsCommitter<String> sink,
-                    boolean isBatchMode,
-                    boolean isCheckpointingEnabled,
-                    int maxParallelism,
-                    int parallelism,
-                    int subtaskId)
-                    throws Exception {
-        return new OneInputStreamOperatorTestHarness<>(
-                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled),
-                maxParallelism,
-                parallelism,
-                subtaskId);
-    }
-
-    abstract SinkAndCounters sinkWithPostCommit();
-
-    abstract SinkAndCounters sinkWithPostCommitWithRetry();
-
-    abstract SinkAndCounters sinkWithoutPostCommit();
-
-    static class SinkAndCounters {
-        SupportsCommitter<String> sink;
-        IntSupplier commitCounter;
-
-        public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier 
commitCounter) {
-            this.sink = sink;
-            this.commitCounter = commitCounter;
-        }
-    }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index 0112b5cf862..4c7291dd44c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -19,11 +19,32 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.configuration.SinkOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.ListAssert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collection;
+import java.util.function.IntSupplier;
+
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
+import static org.assertj.core.api.Assertions.as;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 
-class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase {
-    @Override
+class SinkV2CommitterOperatorTest {
     SinkAndCounters sinkWithPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
@@ -35,9 +56,8 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 () -> committer.successfulCommits);
     }
 
-    @Override
     SinkAndCounters sinkWithPostCommitWithRetry() {
-        return new CommitterOperatorTestBase.SinkAndCounters(
+        return new SinkAndCounters(
                 (SupportsCommitter<String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
@@ -46,7 +66,6 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 () -> 0);
     }
 
-    @Override
     SinkAndCounters sinkWithoutPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
@@ -58,6 +77,302 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 () -> committer.successfulCommits);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testEmitCommittables(boolean withPostCommitTopology) throws Exception 
{
+        SinkAndCounters sinkAndCounters;
+        if (withPostCommitTopology) {
+            // Insert global committer to simulate post commit topology
+            sinkAndCounters = sinkWithPostCommit();
+        } else {
+            sinkAndCounters = sinkWithoutPostCommit();
+        }
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
CommitterOperatorFactory<>(sinkAndCounters.sink, false, true));
+        testHarness.open();
+
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(1, 1, 1L, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableWithLineage<String> committableWithLineage =
+                new CommittableWithLineage<>("1", 1L, 1);
+        testHarness.processElement(new StreamRecord<>(committableWithLineage));
+
+        // Trigger commit
+        testHarness.notifyOfCompletedCheckpoint(1);
+
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
+        if (withPostCommitTopology) {
+            ListAssert<CommittableMessage<String>> records =
+                    assertThat(testHarness.extractOutputValues()).hasSize(2);
+            records.element(0, as(committableSummary()))
+                    
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
+                    
.hasOverallCommittables(committableSummary.getNumberOfCommittables());
+            records.element(1, as(committableWithLineage()))
+                    .isEqualTo(committableWithLineage.withSubtaskId(0));
+        } else {
+            assertThat(testHarness.getOutput()).isEmpty();
+        }
+        testHarness.close();
+    }
+
+    @Test
+    void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true);
+        testHarness.open();
+        testHarness.setProcessingTime(0);
+
+        // Only send first committable
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(1, 1, 1L, 2, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", 1L, 1);
+        testHarness.processElement(new StreamRecord<>(first));
+
+        assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1))
+                .hasMessageContaining("Trying to commit incomplete batch of 
committables");
+
+        assertThat(testHarness.getOutput()).isEmpty();
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero();
+
+        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("2", 1L, 1);
+        testHarness.processElement(new StreamRecord<>(second));
+
+        assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException();
+
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
+        ListAssert<CommittableMessage<String>> records =
+                assertThat(testHarness.extractOutputValues()).hasSize(3);
+        records.element(0, as(committableSummary()))
+                
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
+                
.hasOverallCommittables(committableSummary.getNumberOfCommittables());
+        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
+        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
+        testHarness.close();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws 
Exception {
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness = createTestHarness(sinkAndCounters.sink, 
isBatchMode, !isBatchMode);
+        testHarness.open();
+
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(1, 2, EOI, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableSummary<String> committableSummary2 =
+                new CommittableSummary<>(2, 2, EOI, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary2));
+
+        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", EOI, 1);
+        testHarness.processElement(new StreamRecord<>(first));
+        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("1", EOI, 2);
+        testHarness.processElement(new StreamRecord<>(second));
+
+        testHarness.endInput();
+        if (!isBatchMode) {
+            assertThat(testHarness.getOutput()).isEmpty();
+            // notify final checkpoint complete
+            testHarness.notifyOfCompletedCheckpoint(1);
+        }
+
+        ListAssert<CommittableMessage<String>> records =
+                assertThat(testHarness.extractOutputValues()).hasSize(3);
+        records.element(0, as(committableSummary()))
+                .hasFailedCommittables(0)
+                .hasOverallCommittables(2);
+        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
+        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
+        testHarness.close();
+    }
+
+    @Test
+    void testStateRestore() throws Exception {
+
+        final int originalSubtaskId = 0;
+        final int subtaskIdAfterRecovery = 9;
+
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        createTestHarness(
+                                sinkWithPostCommitWithRetry().sink,
+                                false,
+                                true,
+                                1,
+                                1,
+                                originalSubtaskId);
+        testHarness.open();
+
+        // We cannot test a different checkpoint thant 0 because when using 
the OperatorTestHarness
+        // for recovery the lastCompleted checkpoint is always reset to 0.
+        long checkpointId = 0L;
+
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 
1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableWithLineage<String> first =
+                new CommittableWithLineage<>("1", checkpointId, 
originalSubtaskId);
+        testHarness.processElement(new StreamRecord<>(first));
+
+        // another committable for the same checkpointId but from different 
subtask.
+        final CommittableSummary<String> committableSummary2 =
+                new CommittableSummary<>(originalSubtaskId + 1, 1, 
checkpointId, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary2));
+        final CommittableWithLineage<String> second =
+                new CommittableWithLineage<>("2", checkpointId, 
originalSubtaskId + 1);
+        testHarness.processElement(new StreamRecord<>(second));
+
+        final OperatorSubtaskState snapshot = 
testHarness.snapshot(checkpointId, 2L);
+        assertThat(testHarness.getOutput()).isEmpty();
+        testHarness.close();
+
+        // create new testHarness but with different parallelism level and 
subtaskId that original
+        // one.
+        // we will make sure that new subtaskId was used during committable 
recovery.
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                restored =
+                        createTestHarness(
+                                sinkAndCounters.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
+
+        restored.initializeState(snapshot);
+        restored.open();
+
+        // Previous committables are immediately committed if possible
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
+        ListAssert<CommittableMessage<String>> records =
+                assertThat(restored.extractOutputValues()).hasSize(3);
+        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
+                records.element(0, as(committableSummary()))
+                        .hasCheckpointId(checkpointId)
+                        .hasFailedCommittables(0)
+                        .hasSubtaskId(subtaskIdAfterRecovery);
+        objectCommittableSummaryAssert.hasOverallCommittables(2);
+
+        // Expect the same checkpointId that the original snapshot was made 
with.
+        records.element(1, as(committableWithLineage()))
+                .hasCheckpointId(checkpointId)
+                .hasSubtaskId(subtaskIdAfterRecovery)
+                .hasCommittable(first.getCommittable());
+        records.element(2, as(committableWithLineage()))
+                .hasCheckpointId(checkpointId)
+                .hasSubtaskId(subtaskIdAfterRecovery)
+                .hasCommittable(second.getCommittable());
+        restored.close();
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1})
+    void testNumberOfRetries(int numRetries) throws Exception {
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        createTestHarness(
+                                sinkWithPostCommitWithRetry().sink, false, 
true, 1, 1, 0)) {
+            testHarness
+                    .getStreamConfig()
+                    .getConfiguration()
+                    .set(SinkOptions.COMMITTER_RETRIES, numRetries);
+            testHarness.open();
+
+            long ckdId = 1L;
+            testHarness.processElement(
+                    new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 
1, 0)));
+            testHarness.processElement(
+                    new StreamRecord<>(new CommittableWithLineage<>("1", 
ckdId, 0)));
+            AbstractThrowableAssert<?, ? extends Throwable> throwableAssert =
+                    assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(ckdId));
+            if (numRetries == 0) {
+                throwableAssert.hasMessageContaining("Failed to commit 1 
committables");
+            } else {
+                throwableAssert.doesNotThrowAnyException();
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
+        final SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new CommitterOperatorFactory<>(
+                                        sinkAndCounters.sink, false, 
isCheckpointingEnabled))) {
+            testHarness.open();
+
+            final CommittableSummary<String> committableSummary =
+                    new CommittableSummary<>(1, 1, 1L, 1, 0);
+            testHarness.processElement(new StreamRecord<>(committableSummary));
+            final CommittableWithLineage<String> committableWithLineage =
+                    new CommittableWithLineage<>("1", 1L, 1);
+            testHarness.processElement(new 
StreamRecord<>(committableWithLineage));
+
+            testHarness.endInput();
+
+            // If checkpointing enabled endInput does not emit anything 
because a final checkpoint
+            // follows
+            if (isCheckpointingEnabled) {
+                testHarness.notifyOfCompletedCheckpoint(1);
+            }
+
+            ListAssert<CommittableMessage<String>> records =
+                    assertThat(testHarness.extractOutputValues()).hasSize(2);
+            CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
+                    records.element(0, 
as(committableSummary())).hasCheckpointId(1L);
+            objectCommittableSummaryAssert.hasOverallCommittables(1);
+            records.element(1, as(committableWithLineage()))
+                    .isEqualTo(committableWithLineage.withSubtaskId(0));
+
+            // Future emission calls should change the output
+            testHarness.notifyOfCompletedCheckpoint(2);
+            testHarness.endInput();
+
+            assertThat(testHarness.getOutput()).hasSize(2);
+        }
+    }
+
+    private OneInputStreamOperatorTestHarness<
+                    CommittableMessage<String>, CommittableMessage<String>>
+            createTestHarness(
+                    SupportsCommitter<String> sink,
+                    boolean isBatchMode,
+                    boolean isCheckpointingEnabled)
+                    throws Exception {
+        return new OneInputStreamOperatorTestHarness<>(
+                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled));
+    }
+
+    private OneInputStreamOperatorTestHarness<
+                    CommittableMessage<String>, CommittableMessage<String>>
+            createTestHarness(
+                    SupportsCommitter<String> sink,
+                    boolean isBatchMode,
+                    boolean isCheckpointingEnabled,
+                    int maxParallelism,
+                    int parallelism,
+                    int subtaskId)
+                    throws Exception {
+        return new OneInputStreamOperatorTestHarness<>(
+                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled),
+                maxParallelism,
+                parallelism,
+                subtaskId);
+    }
+
     private static class ForwardingCommitter extends 
TestSinkV2.DefaultCommitter {
         private int successfulCommits = 0;
 
@@ -69,4 +384,14 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
         @Override
         public void close() throws Exception {}
     }
+
+    static class SinkAndCounters {
+        SupportsCommitter<String> sink;
+        IntSupplier commitCounter;
+
+        public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier 
commitCounter) {
+            this.sink = sink;
+            this.commitCounter = commitCounter;
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
index ae0616e8b97..4cac3953d4f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
@@ -18,26 +18,119 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList;
 
+import org.assertj.core.api.ListAssert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID;
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
+import static org.assertj.core.api.Assertions.as;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SinkV2SinkWriterOperatorTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testLoadPreviousSinkState(boolean stateful) throws Exception {
+        // 1. Build previous sink state
+        final List<String> previousSinkInputs =
+                Arrays.asList(
+                        "bit", "mention", "thick", "stick", "stir", "easy", 
"sleep", "forth",
+                        "cost", "prompt");
+
+        InspectableSink sink = sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
+        int expectedState = 5;
+        final OneInputStreamOperatorTestHarness<String, String> previousSink =
+                new OneInputStreamOperatorTestHarness<>(
+                        new CompatibleStateSinkOperator<>(
+                                TestSinkV2.WRITER_SERIALIZER, expectedState),
+                        StringSerializer.INSTANCE);
+
+        OperatorSubtaskState previousSinkState =
+                TestHarnessUtil.buildSubtaskState(previousSink, 
previousSinkInputs);
+
+        // 2. Load previous sink state and verify state
+        Sink<Integer> sink3 = sink.getSink();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+                compatibleWriterOperator =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new SinkWriterOperatorFactory<>(sink3));
+
+        // load the state from previous sink
+        compatibleWriterOperator.initializeState(previousSinkState);
+        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
+
+        // 3. do another snapshot and check if this also can be restored 
without compabitible state
+        // name
+        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
+        OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 
1L);
+
+        compatibleWriterOperator.close();
+
+        // 4. Restore the sink without previous sink's state
+        InspectableSink sink2 =
+                sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+                restoredSinkOperator =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
SinkWriterOperatorFactory<>(sink2.getSink()));
+
+        restoredSinkOperator.initializeState(snapshot);
+        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
+
+        restoredSinkOperator.close();
+    }
 
-class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase {
-    @Override
     InspectableSink sinkWithoutCommitter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
         return new 
InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build());
     }
 
-    @Override
     InspectableSink sinkWithCommitter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
                 new TestSinkV2.DefaultCommittingSinkWriter<>();
@@ -48,7 +141,6 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
                         .build());
     }
 
-    @Override
     InspectableSink sinkWithTimeBasedWriter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
         return new InspectableSink(
@@ -58,7 +150,6 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
                         .build());
     }
 
-    @Override
     InspectableSink sinkWithState(boolean withState, String stateName) {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
                 new TestSinkV2.DefaultStatefulSinkWriter<>();
@@ -76,6 +167,308 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
         return new InspectableSink(builder.build());
     }
 
+    @Test
+    void testNotEmitCommittablesWithoutCommitter() throws Exception {
+        InspectableSink sink = sinkWithoutCommitter();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
+        testHarness.open();
+        testHarness.processElement(1, 1);
+
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+        assertThat(sink.getRecordsOfCurrentCheckpoint())
+                .containsOnly("(1,1," + Long.MIN_VALUE + ")");
+
+        testHarness.prepareSnapshotPreBarrier(1);
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+        // Elements are flushed
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
+        testHarness.close();
+    }
+
+    @Test
+    void testWatermarkPropagatedToSinkWriter() throws Exception {
+        final long initialTime = 0;
+
+        InspectableSink sink = sinkWithoutCommitter();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
+        testHarness.open();
+
+        testHarness.processWatermark(initialTime);
+        testHarness.processWatermark(initialTime + 1);
+
+        assertThat(testHarness.getOutput())
+                .containsExactly(
+                        new 
org.apache.flink.streaming.api.watermark.Watermark(initialTime),
+                        new 
org.apache.flink.streaming.api.watermark.Watermark(initialTime + 1));
+        assertThat(sink.getWatermarks())
+                .containsExactly(new Watermark(initialTime), new 
Watermark(initialTime + 1));
+        testHarness.close();
+    }
+
+    @Test
+    void testTimeBasedBufferingSinkWriter() throws Exception {
+        final long initialTime = 0;
+
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new 
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink()));
+
+        testHarness.open();
+
+        testHarness.setProcessingTime(0L);
+
+        testHarness.processElement(1, initialTime + 1);
+        testHarness.processElement(2, initialTime + 2);
+
+        testHarness.prepareSnapshotPreBarrier(1L);
+
+        // Expect empty committableSummary
+        assertBasicOutput(testHarness.extractOutputValues(), 0, 1L);
+
+        testHarness.getProcessingTimeService().setCurrentTime(2001);
+
+        testHarness.prepareSnapshotPreBarrier(2L);
+
+        assertBasicOutput(
+                
testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()),
+                2,
+                2L);
+        testHarness.close();
+    }
+
+    @Test
+    void testEmitOnFlushWithCommitter() throws Exception {
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new 
SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()));
+
+        testHarness.open();
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+
+        testHarness.processElement(1, 1);
+        testHarness.processElement(2, 2);
+
+        // flush
+        testHarness.prepareSnapshotPreBarrier(1);
+
+        assertBasicOutput(testHarness.extractOutputValues(), 2, 1L);
+        testHarness.close();
+    }
+
+    @Test
+    void testEmitOnEndOfInputInBatchMode() throws Exception {
+        final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
+                new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink());
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
+
+        testHarness.open();
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+
+        testHarness.processElement(1, 1);
+        testHarness.endInput();
+        assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testStateRestore(boolean stateful) throws Exception {
+
+        final long initialTime = 0;
+
+        final InspectableSink sink = sinkWithState(stateful, null);
+        Sink<Integer> sink2 = sink.getSink();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink2));
+
+        testHarness.open();
+
+        testHarness.processWatermark(initialTime);
+        testHarness.processElement(1, initialTime + 1);
+        testHarness.processElement(2, initialTime + 2);
+
+        testHarness.prepareSnapshotPreBarrier(1L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
+
+        assertThat(sink.getRecordCountFromState()).isEqualTo(2);
+        assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L);
+
+        testHarness.close();
+
+        final InspectableSink restoredSink = sinkWithState(stateful, null);
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+                restoredTestHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
SinkWriterOperatorFactory<>(restoredSink.getSink()));
+
+        restoredTestHarness.initializeState(snapshot);
+        restoredTestHarness.open();
+
+        // check that the previous state is correctly restored
+        assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful 
? 2 : 0);
+
+        restoredTestHarness.close();
+    }
+
+    @Test
+    void testRestoreCommitterState() throws Exception {
+        final List<String> committables = Arrays.asList("state1", "state2");
+
+        InspectableSink sink = sinkWithCommitter();
+        final OneInputStreamOperatorTestHarness<String, String> committer =
+                new OneInputStreamOperatorTestHarness<>(
+                        new 
TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER),
+                        StringSerializer.INSTANCE);
+
+        final OperatorSubtaskState committerState =
+                TestHarnessUtil.buildSubtaskState(committer, committables);
+
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
+
+        testHarness.initializeState(committerState);
+
+        testHarness.open();
+
+        testHarness.prepareSnapshotPreBarrier(2);
+
+        final ListAssert<CommittableMessage<Integer>> records =
+                assertThat(testHarness.extractOutputValues()).hasSize(4);
+
+        records.element(0, as(committableSummary()))
+                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
+                .hasOverallCommittables(committables.size());
+        records.<CommittableWithLineageAssert<String>>element(1, 
as(committableWithLineage()))
+                .hasCommittable(committables.get(0))
+                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
+                .hasSubtaskId(0);
+        records.<CommittableWithLineageAssert<String>>element(2, 
as(committableWithLineage()))
+                .hasCommittable(committables.get(1))
+                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
+                .hasSubtaskId(0);
+        records.element(3, 
as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
+        InspectableSink sink = sinkWithCommitter();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<String>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
+        testHarness.open();
+        testHarness.processElement(1, 1);
+
+        assertThat(testHarness.extractOutputValues()).isEmpty();
+        final String record = "(1,1," + Long.MIN_VALUE + ")";
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record);
+
+        testHarness.endInput();
+
+        if (isCheckpointingEnabled) {
+            testHarness.prepareSnapshotPreBarrier(1);
+        }
+
+        List<String> committables = Collections.singletonList(record);
+
+        ListAssert<CommittableMessage<String>> records =
+                
assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1);
+        records.element(0, 
as(committableSummary())).hasOverallCommittables(committables.size());
+
+        records.filteredOn(message -> message instanceof 
CommittableWithLineage)
+                .map(message -> ((CommittableWithLineage<String>) 
message).getCommittable())
+                .containsExactlyInAnyOrderElementsOf(committables);
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
+
+        testHarness.close();
+    }
+
+    @Test
+    void testInitContext() throws Exception {
+        final AtomicReference<WriterInitContext> initContext = new 
AtomicReference<>();
+        final Sink<String> sink =
+                context -> {
+                    initContext.set(context);
+                    return null;
+                };
+
+        final int subtaskId = 1;
+        final int parallelism = 10;
+        final TypeSerializer<String> typeSerializer = 
StringSerializer.INSTANCE;
+        final JobID jobID = new JobID();
+
+        final MockEnvironment environment =
+                MockEnvironment.builder()
+                        .setSubtaskIndex(subtaskId)
+                        .setParallelism(parallelism)
+                        .setMaxParallelism(parallelism)
+                        .setJobID(jobID)
+                        .setExecutionConfig(new 
ExecutionConfig().enableObjectReuse())
+                        .build();
+
+        final OneInputStreamOperatorTestHarness<String, 
CommittableMessage<String>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink), typeSerializer, 
environment);
+        testHarness.open();
+
+        assertThat(initContext.get().getUserCodeClassLoader()).isNotNull();
+        assertThat(initContext.get().getMailboxExecutor()).isNotNull();
+        assertThat(initContext.get().getProcessingTimeService()).isNotNull();
+        
assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId);
+        
assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks())
+                .isEqualTo(parallelism);
+        
assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero();
+        assertThat(initContext.get().metricGroup()).isNotNull();
+        assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent();
+        assertThat(initContext.get().isObjectReuseEnabled()).isTrue();
+        
assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer);
+        assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID);
+
+        testHarness.close();
+    }
+
+    private static void assertContextsEqual(
+            WriterInitContext initContext, WriterInitContext original) {
+        assertThat(initContext.getUserCodeClassLoader().asClassLoader())
+                .isEqualTo(original.getUserCodeClassLoader().asClassLoader());
+        
assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor());
+        assertThat(initContext.getProcessingTimeService())
+                .isEqualTo(original.getProcessingTimeService());
+        assertThat(initContext.getTaskInfo().getIndexOfThisSubtask())
+                .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask());
+        assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks())
+                
.isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks());
+        assertThat(initContext.getTaskInfo().getAttemptNumber())
+                .isEqualTo(original.getTaskInfo().getAttemptNumber());
+        
assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup());
+        assertThat(initContext.getRestoredCheckpointId())
+                .isEqualTo(original.getRestoredCheckpointId());
+        
assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled());
+        
assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer());
+        
assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId());
+        
assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer());
+    }
+
+    private static void assertBasicOutput(
+            List<CommittableMessage<Integer>> output, int 
numberOfCommittables, long checkpointId) {
+        ListAssert<CommittableMessage<Integer>> records =
+                assertThat(output).hasSize(numberOfCommittables + 1);
+        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
+                records.element(0, as(committableSummary()))
+                        .hasOverallCommittables(numberOfCommittables);
+        records.filteredOn(r -> r instanceof CommittableWithLineage)
+                .allSatisfy(
+                        cl ->
+                                
SinkV2Assertions.assertThat((CommittableWithLineage<?>) cl)
+                                        .hasCheckpointId(checkpointId)
+                                        .hasSubtaskId(0));
+    }
+
     private static class TimeBasedBufferingSinkWriter
             extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
             implements ProcessingTimeService.ProcessingTimeCallback {
@@ -131,30 +524,124 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
         }
     }
 
-    static class InspectableSink extends 
AbstractInspectableSink<TestSinkV2<Integer>> {
+    static class InspectableSink {
+        private final TestSinkV2<Integer> sink;
+
         InspectableSink(TestSinkV2<Integer> sink) {
-            super(sink);
+            this.sink = sink;
+        }
+
+        public TestSinkV2<Integer> getSink() {
+            return sink;
         }
 
-        @Override
         public long getLastCheckpointId() {
             return getSink().getWriter().lastCheckpointId;
         }
 
-        @Override
         public List<String> getRecordsOfCurrentCheckpoint() {
             return getSink().getWriter().elements;
         }
 
-        @Override
         public List<Watermark> getWatermarks() {
             return getSink().getWriter().watermarks;
         }
 
-        @Override
         public int getRecordCountFromState() {
             return ((TestSinkV2.DefaultStatefulSinkWriter<?>) 
getSink().getWriter())
                     .getRecordCount();
         }
     }
+
+    private static class TestCommitterOperator extends 
AbstractStreamOperator<String>
+            implements OneInputStreamOperator<String, String> {
+
+        private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
+                new ListStateDescriptor<>(
+                        "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
+        private ListState<List<String>> committerState;
+        private final List<String> buffer = new ArrayList<>();
+        private final SimpleVersionedSerializer<String> serializer;
+
+        public TestCommitterOperator(SimpleVersionedSerializer<String> 
serializer) {
+            this.serializer = serializer;
+        }
+
+        @Override
+        public void initializeState(StateInitializationContext context) throws 
Exception {
+            super.initializeState(context);
+            committerState =
+                    new SimpleVersionedListState<>(
+                            context.getOperatorStateStore()
+                                    
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+                            new TestingCommittableSerializer(serializer));
+        }
+
+        @Override
+        public void processElement(StreamRecord<String> element) throws 
Exception {
+            buffer.add(element.getValue());
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            committerState.add(buffer);
+        }
+    }
+
+    /** Writes state to test whether the sink can read from alternative state 
names. */
+    private static class CompatibleStateSinkOperator<T> extends 
AbstractStreamOperator<String>
+            implements OneInputStreamOperator<String, String> {
+
+        static final String SINK_STATE_NAME = "compatible_sink_state";
+
+        static final ListStateDescriptor<byte[]> SINK_STATE_DESC =
+                new ListStateDescriptor<>(SINK_STATE_NAME, 
BytePrimitiveArraySerializer.INSTANCE);
+        ListState<T> sinkState;
+        private final SimpleVersionedSerializer<T> serializer;
+        private final T initialState;
+
+        public CompatibleStateSinkOperator(
+                SimpleVersionedSerializer<T> serializer, T initialState) {
+            this.serializer = serializer;
+            this.initialState = initialState;
+        }
+
+        public void initializeState(StateInitializationContext context) throws 
Exception {
+            super.initializeState(context);
+            sinkState =
+                    new SimpleVersionedListState<>(
+                            
context.getOperatorStateStore().getListState(SINK_STATE_DESC),
+                            serializer);
+            if (!context.isRestored()) {
+                sinkState.add(initialState);
+            }
+        }
+
+        @Override
+        public void processElement(StreamRecord<String> element) {
+            // do nothing
+        }
+    }
+
+    private static class TestingCommittableSerializer
+            extends SinkV1WriterCommittableSerializer<String> {
+
+        private final SimpleVersionedSerializer<String> committableSerializer;
+
+        public TestingCommittableSerializer(
+                SimpleVersionedSerializer<String> committableSerializer) {
+            super(committableSerializer);
+            this.committableSerializer = committableSerializer;
+        }
+
+        @Override
+        public byte[] serialize(List<String> obj) throws IOException {
+            final DataOutputSerializer out = new DataOutputSerializer(256);
+            out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER);
+            SimpleVersionedSerialization.writeVersionAndSerializeList(
+                    committableSerializer, obj, out);
+            return out.getCopyOfBuffer();
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
deleted file mode 100644
index 6bde86216b9..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import 
org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
-import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-
-import org.assertj.core.api.ListAssert;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID;
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
-import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
-import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
-import static org.assertj.core.api.Assertions.as;
-import static org.assertj.core.api.Assertions.assertThat;
-
-abstract class SinkWriterOperatorTestBase {
-
-    @Test
-    void testNotEmitCommittablesWithoutCommitter() throws Exception {
-        InspectableSink sink = sinkWithoutCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-        testHarness.processElement(1, 1);
-
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        assertThat(sink.getRecordsOfCurrentCheckpoint())
-                .containsOnly("(1,1," + Long.MIN_VALUE + ")");
-
-        testHarness.prepareSnapshotPreBarrier(1);
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        // Elements are flushed
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
-        testHarness.close();
-    }
-
-    @Test
-    void testWatermarkPropagatedToSinkWriter() throws Exception {
-        final long initialTime = 0;
-
-        InspectableSink sink = sinkWithoutCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-
-        testHarness.processWatermark(initialTime);
-        testHarness.processWatermark(initialTime + 1);
-
-        assertThat(testHarness.getOutput())
-                .containsExactly(new Watermark(initialTime), new 
Watermark(initialTime + 1));
-        assertThat(sink.getWatermarks())
-                .containsExactly(
-                        new 
org.apache.flink.api.common.eventtime.Watermark(initialTime),
-                        new 
org.apache.flink.api.common.eventtime.Watermark(initialTime + 1));
-        testHarness.close();
-    }
-
-    @Test
-    void testTimeBasedBufferingSinkWriter() throws Exception {
-        final long initialTime = 0;
-
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink()));
-
-        testHarness.open();
-
-        testHarness.setProcessingTime(0L);
-
-        testHarness.processElement(1, initialTime + 1);
-        testHarness.processElement(2, initialTime + 2);
-
-        testHarness.prepareSnapshotPreBarrier(1L);
-
-        // Expect empty committableSummary
-        assertBasicOutput(testHarness.extractOutputValues(), 0, 1L);
-
-        testHarness.getProcessingTimeService().setCurrentTime(2001);
-
-        testHarness.prepareSnapshotPreBarrier(2L);
-
-        assertBasicOutput(
-                
testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()),
-                2,
-                2L);
-        testHarness.close();
-    }
-
-    @Test
-    void testEmitOnFlushWithCommitter() throws Exception {
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()));
-
-        testHarness.open();
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-
-        testHarness.processElement(1, 1);
-        testHarness.processElement(2, 2);
-
-        // flush
-        testHarness.prepareSnapshotPreBarrier(1);
-
-        assertBasicOutput(testHarness.extractOutputValues(), 2, 1L);
-        testHarness.close();
-    }
-
-    @Test
-    void testEmitOnEndOfInputInBatchMode() throws Exception {
-        final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink());
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
-
-        testHarness.open();
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-
-        testHarness.processElement(1, 1);
-        testHarness.endInput();
-        assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testStateRestore(boolean stateful) throws Exception {
-
-        final long initialTime = 0;
-
-        final InspectableSink sink = sinkWithState(stateful, null);
-        Sink<Integer> sink2 = sink.getSink();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink2));
-
-        testHarness.open();
-
-        testHarness.processWatermark(initialTime);
-        testHarness.processElement(1, initialTime + 1);
-        testHarness.processElement(2, initialTime + 2);
-
-        testHarness.prepareSnapshotPreBarrier(1L);
-        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
-
-        assertThat(sink.getRecordCountFromState()).isEqualTo(2);
-        assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L);
-
-        testHarness.close();
-
-        final InspectableSink restoredSink = sinkWithState(stateful, null);
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                restoredTestHarness =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new 
SinkWriterOperatorFactory<>(restoredSink.getSink()));
-
-        restoredTestHarness.initializeState(snapshot);
-        restoredTestHarness.open();
-
-        // check that the previous state is correctly restored
-        assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful 
? 2 : 0);
-
-        restoredTestHarness.close();
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLoadPreviousSinkState(boolean stateful) throws Exception {
-        // 1. Build previous sink state
-        final List<String> previousSinkInputs =
-                Arrays.asList(
-                        "bit", "mention", "thick", "stick", "stir", "easy", 
"sleep", "forth",
-                        "cost", "prompt");
-
-        InspectableSink sink = sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
-        int expectedState = 5;
-        final OneInputStreamOperatorTestHarness<String, String> previousSink =
-                new OneInputStreamOperatorTestHarness<>(
-                        new CompatibleStateSinkOperator<>(
-                                TestSinkV2.WRITER_SERIALIZER, expectedState),
-                        StringSerializer.INSTANCE);
-
-        OperatorSubtaskState previousSinkState =
-                TestHarnessUtil.buildSubtaskState(previousSink, 
previousSinkInputs);
-
-        // 2. Load previous sink state and verify state
-        Sink<Integer> sink3 = sink.getSink();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                compatibleWriterOperator =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new SinkWriterOperatorFactory<>(sink3));
-
-        // load the state from previous sink
-        compatibleWriterOperator.initializeState(previousSinkState);
-        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
-
-        // 3. do another snapshot and check if this also can be restored 
without compabitible state
-        // name
-        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
-        OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 
1L);
-
-        compatibleWriterOperator.close();
-
-        // 4. Restore the sink without previous sink's state
-        InspectableSink sink2 =
-                sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                restoredSinkOperator =
-                        new OneInputStreamOperatorTestHarness<>(
-                                new 
SinkWriterOperatorFactory<>(sink2.getSink()));
-
-        restoredSinkOperator.initializeState(snapshot);
-        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
-
-        restoredSinkOperator.close();
-    }
-
-    @Test
-    void testRestoreCommitterState() throws Exception {
-        final List<String> committables = Arrays.asList("state1", "state2");
-
-        InspectableSink sink = sinkWithCommitter();
-        final OneInputStreamOperatorTestHarness<String, String> committer =
-                new OneInputStreamOperatorTestHarness<>(
-                        new 
TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER),
-                        StringSerializer.INSTANCE);
-
-        final OperatorSubtaskState committerState =
-                TestHarnessUtil.buildSubtaskState(committer, committables);
-
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-
-        testHarness.initializeState(committerState);
-
-        testHarness.open();
-
-        testHarness.prepareSnapshotPreBarrier(2);
-
-        final ListAssert<CommittableMessage<Integer>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(4);
-
-        records.element(0, as(committableSummary()))
-                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
-                .hasOverallCommittables(committables.size());
-        records.<CommittableWithLineageAssert<String>>element(1, 
as(committableWithLineage()))
-                .hasCommittable(committables.get(0))
-                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
-                .hasSubtaskId(0);
-        records.<CommittableWithLineageAssert<String>>element(2, 
as(committableWithLineage()))
-                .hasCommittable(committables.get(1))
-                .hasCheckpointId(INITIAL_CHECKPOINT_ID)
-                .hasSubtaskId(0);
-        records.element(3, 
as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
-        InspectableSink sink = sinkWithCommitter();
-        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<String>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink.getSink()));
-        testHarness.open();
-        testHarness.processElement(1, 1);
-
-        assertThat(testHarness.extractOutputValues()).isEmpty();
-        final String record = "(1,1," + Long.MIN_VALUE + ")";
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record);
-
-        testHarness.endInput();
-
-        if (isCheckpointingEnabled) {
-            testHarness.prepareSnapshotPreBarrier(1);
-        }
-
-        List<String> committables = Collections.singletonList(record);
-
-        ListAssert<CommittableMessage<String>> records =
-                
assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1);
-        records.element(0, 
as(committableSummary())).hasOverallCommittables(committables.size());
-
-        records.filteredOn(message -> message instanceof 
CommittableWithLineage)
-                .map(message -> ((CommittableWithLineage<String>) 
message).getCommittable())
-                .containsExactlyInAnyOrderElementsOf(committables);
-        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
-
-        testHarness.close();
-    }
-
-    @Test
-    void testInitContext() throws Exception {
-        final 
AtomicReference<org.apache.flink.api.connector.sink2.WriterInitContext> 
initContext =
-                new AtomicReference<>();
-        final org.apache.flink.api.connector.sink2.Sink<String> sink =
-                context -> {
-                    initContext.set(context);
-                    return null;
-                };
-
-        final int subtaskId = 1;
-        final int parallelism = 10;
-        final TypeSerializer<String> typeSerializer = 
StringSerializer.INSTANCE;
-        final JobID jobID = new JobID();
-
-        final MockEnvironment environment =
-                MockEnvironment.builder()
-                        .setSubtaskIndex(subtaskId)
-                        .setParallelism(parallelism)
-                        .setMaxParallelism(parallelism)
-                        .setJobID(jobID)
-                        .setExecutionConfig(new 
ExecutionConfig().enableObjectReuse())
-                        .build();
-
-        final OneInputStreamOperatorTestHarness<String, 
CommittableMessage<String>> testHarness =
-                new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(sink), typeSerializer, 
environment);
-        testHarness.open();
-
-        assertThat(initContext.get().getUserCodeClassLoader()).isNotNull();
-        assertThat(initContext.get().getMailboxExecutor()).isNotNull();
-        assertThat(initContext.get().getProcessingTimeService()).isNotNull();
-        
assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId);
-        
assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks())
-                .isEqualTo(parallelism);
-        
assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero();
-        assertThat(initContext.get().metricGroup()).isNotNull();
-        assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent();
-        assertThat(initContext.get().isObjectReuseEnabled()).isTrue();
-        
assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer);
-        assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID);
-
-        testHarness.close();
-    }
-
-    private static void assertContextsEqual(
-            WriterInitContext initContext, WriterInitContext original) {
-        assertThat(initContext.getUserCodeClassLoader().asClassLoader())
-                .isEqualTo(original.getUserCodeClassLoader().asClassLoader());
-        
assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor());
-        assertThat(initContext.getProcessingTimeService())
-                .isEqualTo(original.getProcessingTimeService());
-        assertThat(initContext.getTaskInfo().getIndexOfThisSubtask())
-                .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask());
-        assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks())
-                
.isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks());
-        assertThat(initContext.getTaskInfo().getAttemptNumber())
-                .isEqualTo(original.getTaskInfo().getAttemptNumber());
-        
assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup());
-        assertThat(initContext.getRestoredCheckpointId())
-                .isEqualTo(original.getRestoredCheckpointId());
-        
assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled());
-        
assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer());
-        
assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId());
-        
assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer());
-    }
-
-    private static void assertBasicOutput(
-            List<CommittableMessage<Integer>> output, int 
numberOfCommittables, long checkpointId) {
-        ListAssert<CommittableMessage<Integer>> records =
-                assertThat(output).hasSize(numberOfCommittables + 1);
-        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
-                records.element(0, as(committableSummary()))
-                        .hasOverallCommittables(numberOfCommittables);
-        records.filteredOn(r -> r instanceof CommittableWithLineage)
-                .allSatisfy(
-                        cl ->
-                                
SinkV2Assertions.assertThat((CommittableWithLineage<?>) cl)
-                                        .hasCheckpointId(checkpointId)
-                                        .hasSubtaskId(0));
-    }
-
-    private static class TestCommitterOperator extends 
AbstractStreamOperator<String>
-            implements OneInputStreamOperator<String, String> {
-
-        private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
-                new ListStateDescriptor<>(
-                        "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
-        private ListState<List<String>> committerState;
-        private final List<String> buffer = new ArrayList<>();
-        private final SimpleVersionedSerializer<String> serializer;
-
-        public TestCommitterOperator(SimpleVersionedSerializer<String> 
serializer) {
-            this.serializer = serializer;
-        }
-
-        @Override
-        public void initializeState(StateInitializationContext context) throws 
Exception {
-            super.initializeState(context);
-            committerState =
-                    new SimpleVersionedListState<>(
-                            context.getOperatorStateStore()
-                                    
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
-                            new TestingCommittableSerializer(serializer));
-        }
-
-        @Override
-        public void processElement(StreamRecord<String> element) throws 
Exception {
-            buffer.add(element.getValue());
-        }
-
-        @Override
-        public void snapshotState(StateSnapshotContext context) throws 
Exception {
-            super.snapshotState(context);
-            committerState.add(buffer);
-        }
-    }
-
-    /** Writes state to test whether the sink can read from alternative state 
names. */
-    private static class CompatibleStateSinkOperator<T> extends 
AbstractStreamOperator<String>
-            implements OneInputStreamOperator<String, String> {
-
-        static final String SINK_STATE_NAME = "compatible_sink_state";
-
-        static final ListStateDescriptor<byte[]> SINK_STATE_DESC =
-                new ListStateDescriptor<>(SINK_STATE_NAME, 
BytePrimitiveArraySerializer.INSTANCE);
-        ListState<T> sinkState;
-        private final SimpleVersionedSerializer<T> serializer;
-        private final T initialState;
-
-        public CompatibleStateSinkOperator(
-                SimpleVersionedSerializer<T> serializer, T initialState) {
-            this.serializer = serializer;
-            this.initialState = initialState;
-        }
-
-        public void initializeState(StateInitializationContext context) throws 
Exception {
-            super.initializeState(context);
-            sinkState =
-                    new SimpleVersionedListState<>(
-                            
context.getOperatorStateStore().getListState(SINK_STATE_DESC),
-                            serializer);
-            if (!context.isRestored()) {
-                sinkState.add(initialState);
-            }
-        }
-
-        @Override
-        public void processElement(StreamRecord<String> element) {
-            // do nothing
-        }
-    }
-
-    private static class TestingCommittableSerializer
-            extends SinkV1WriterCommittableSerializer<String> {
-
-        private final SimpleVersionedSerializer<String> committableSerializer;
-
-        public TestingCommittableSerializer(
-                SimpleVersionedSerializer<String> committableSerializer) {
-            super(committableSerializer);
-            this.committableSerializer = committableSerializer;
-        }
-
-        @Override
-        public byte[] serialize(List<String> obj) throws IOException {
-            final DataOutputSerializer out = new DataOutputSerializer(256);
-            out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER);
-            SimpleVersionedSerialization.writeVersionAndSerializeList(
-                    committableSerializer, obj, out);
-            return out.getCopyOfBuffer();
-        }
-    }
-
-    abstract InspectableSink sinkWithoutCommitter();
-
-    abstract InspectableSink sinkWithTimeBasedWriter();
-
-    abstract InspectableSink sinkWithState(boolean withState, String 
stateName);
-
-    abstract InspectableSink sinkWithCommitter();
-
-    /**
-     * Basic abstraction to access the different flavors of sinks. Remove once 
the older interfaces
-     * are removed.
-     */
-    interface InspectableSink {
-        long getLastCheckpointId();
-
-        List<String> getRecordsOfCurrentCheckpoint();
-
-        List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks();
-
-        int getRecordCountFromState();
-
-        Sink<Integer> getSink();
-    }
-
-    abstract static class AbstractInspectableSink<
-                    S extends 
org.apache.flink.api.connector.sink2.Sink<Integer>>
-            implements InspectableSink {
-        private final S sink;
-
-        protected AbstractInspectableSink(S sink) {
-            this.sink = sink;
-        }
-
-        @Override
-        public S getSink() {
-            return sink;
-        }
-    }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
deleted file mode 100644
index bb7795e38d4..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink;
-
-import org.apache.flink.api.connector.sink2.SupportsCommitter;
-
-import java.util.Collection;
-
-class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase {
-
-    @Override
-    SinkAndCounters sinkWithPostCommit() {
-        ForwardingCommitter committer = new ForwardingCommitter();
-        return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        TestSinkV2.<String>newBuilder()
-                                .setCommitter(committer)
-                                .setWithPostCommitTopology(true)
-                                .build(),
-                () -> committer.successfulCommits);
-    }
-
-    @Override
-    SinkAndCounters sinkWithPostCommitWithRetry() {
-        return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        TestSinkV2.<String>newBuilder()
-                                .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
-                                .setWithPostCommitTopology(true)
-                                .build(),
-                () -> 0);
-    }
-
-    @Override
-    SinkAndCounters sinkWithoutPostCommit() {
-        ForwardingCommitter committer = new ForwardingCommitter();
-        return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        
TestSinkV2.<String>newBuilder().setCommitter(committer).build(),
-                () -> committer.successfulCommits);
-    }
-
-    private static class ForwardingCommitter extends 
TestSinkV2.DefaultCommitter {
-        private int successfulCommits = 0;
-
-        @Override
-        public void commit(Collection<CommitRequest<String>> committables) {
-            successfulCommits += committables.size();
-        }
-
-        @Override
-        public void close() throws Exception {}
-    }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
deleted file mode 100644
index b0bf67fe4d2..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink;
-
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase {
-    @Override
-    InspectableSink sinkWithoutCommitter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
-        return new 
InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build());
-    }
-
-    @Override
-    InspectableSink sinkWithCommitter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
-                new TestSinkV2.DefaultCommittingSinkWriter<>();
-        return new InspectableSink(
-                TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
-                        .setDefaultCommitter()
-                        .build());
-    }
-
-    @Override
-    InspectableSink sinkWithTimeBasedWriter() {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
-        return new InspectableSink(
-                TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
-                        .setDefaultCommitter()
-                        .build());
-    }
-
-    @Override
-    InspectableSink sinkWithState(boolean withState, String stateName) {
-        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
-                new TestSinkV2.DefaultStatefulSinkWriter<>();
-        TestSinkV2.Builder<Integer> builder =
-                TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
-                        .setDefaultCommitter()
-                        .setWithPostCommitTopology(true);
-        builder.setWriterState(withState);
-        if (stateName != null) {
-            builder.setCompatibleStateNames(stateName);
-        }
-        return new InspectableSink(builder.build());
-    }
-
-    private static class TimeBasedBufferingSinkWriter
-            extends TestSinkV2.DefaultStatefulSinkWriter<Integer>
-            implements ProcessingTimeService.ProcessingTimeCallback {
-
-        private final List<String> cachedCommittables = new ArrayList<>();
-        private ProcessingTimeService processingTimeService;
-
-        @Override
-        public void write(Integer element, Context context) {
-            cachedCommittables.add(
-                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
-        }
-
-        @Override
-        public void onProcessingTime(long time) {
-            elements.addAll(cachedCommittables);
-            cachedCommittables.clear();
-            this.processingTimeService.registerTimer(time + 1000, this);
-        }
-
-        @Override
-        public void init(WriterInitContext context) {
-            this.processingTimeService = context.getProcessingTimeService();
-            this.processingTimeService.registerTimer(1000, this);
-        }
-    }
-
-    static class InspectableSink
-            extends 
AbstractInspectableSink<org.apache.flink.api.connector.sink2.Sink<Integer>> {
-        private final TestSinkV2<Integer> sink;
-
-        InspectableSink(TestSinkV2<Integer> sink) {
-            super(sink);
-            this.sink = sink;
-        }
-
-        @Override
-        public long getLastCheckpointId() {
-            return sink.getWriter().lastCheckpointId;
-        }
-
-        @Override
-        public List<String> getRecordsOfCurrentCheckpoint() {
-            return sink.getWriter().elements;
-        }
-
-        @Override
-        public List<Watermark> getWatermarks() {
-            return sink.getWriter().watermarks;
-        }
-
-        @Override
-        public int getRecordCountFromState() {
-            TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = 
sink.getWriter();
-            if (sinkWriter instanceof TestSinkV2.DefaultStatefulSinkWriter) {
-                return ((TestSinkV2.DefaultStatefulSinkWriter<Integer>) 
sinkWriter)
-                        .getRecordCount();
-            } else {
-                return 0;
-            }
-        }
-    }
-}

Reply via email to