This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6afe98daf61 [FLINK-33973] Add new interfaces for SinkV2 to synchronize 
the API with the SourceV2 API
6afe98daf61 is described below

commit 6afe98daf6190a77a67a1fa8ac8f12337a75f8e7
Author: pvary <peter.vary.apa...@gmail.com>
AuthorDate: Fri Jan 12 10:16:00 2024 +0100

    [FLINK-33973] Add new interfaces for SinkV2 to synchronize the API with the 
SourceV2 API
---
 .../api/connector/sink2/CommitterInitContext.java  | 25 ++----
 .../api/connector/sink2/CommittingSinkWriter.java  | 27 +++---
 .../org/apache/flink/api/connector/sink2/Sink.java | 99 +++++++++++++++++++++-
 .../flink/api/connector/sink2/StatefulSink.java    | 52 +++++-------
 .../api/connector/sink2/StatefulSinkWriter.java    | 29 ++++---
 .../api/connector/sink2/SupportsCommitter.java     | 55 ++++++++++++
 ...{StatefulSink.java => SupportsWriterState.java} | 50 +++--------
 .../connector/sink2/TwoPhaseCommittingSink.java    | 43 ++--------
 .../api/connector/sink2/WriterInitContext.java     | 85 +++++++++++++++++++
 .../api/connector/sink2/CommittableSummary.java    | 10 +++
 .../connector/sink2/CommittableWithLineage.java    |  5 ++
 ...pology.java => SupportsPostCommitTopology.java} |  6 +-
 ...opology.java => SupportsPreCommitTopology.java} | 12 +--
 ...Topology.java => SupportsPreWriteTopology.java} |  3 +-
 .../connector/sink2/WithPostCommitTopology.java    | 23 ++---
 .../api/connector/sink2/WithPreCommitTopology.java | 23 +++--
 .../api/connector/sink2/WithPreWriteTopology.java  | 21 ++---
 .../api/transformations/SinkV1Adapter.java         |  1 +
 .../runtime/operators/sink/CommitterOperator.java  |  2 +-
 .../runtime/operators/sink/SinkWriterOperator.java |  7 +-
 .../operators/sink/SinkWriterStateHandler.java     |  6 +-
 .../sink/StatefulSinkWriterStateHandler.java       | 29 +++++--
 .../sink/StatelessSinkWriterStateHandler.java      |  4 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  1 +
 .../flink/streaming/util/TestExpandingSink.java    |  1 +
 .../scheduling/SpeculativeSchedulerITCase.java     |  1 +
 26 files changed, 400 insertions(+), 220 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
similarity index 51%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
copy to 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
index 6a16b420439..d44865a6491 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
@@ -16,23 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.connector.sink2;
+package org.apache.flink.api.connector.sink2;
 
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 
-/** Allows expert users to implement a custom topology before {@link 
SinkWriter}. */
-@Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
-
-    /**
-     * Adds an arbitrary topology before the writer. The topology may be used 
to repartition the
-     * data.
-     *
-     * @param inputDataStream the stream of input records.
-     * @return the custom topology before {@link SinkWriter}.
-     */
-    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
+/** The interface exposes some runtime info for creating a {@link Committer}. 
*/
+@PublicEvolving
+public interface CommitterInitContext extends InitContext {
+    /** @return The metric group this committer belongs to. */
+    SinkCommitterMetricGroup metricGroup();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
similarity index 52%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
copy to 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
index 6a16b420439..980bcb32ef1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
@@ -16,23 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.connector.sink2;
+package org.apache.flink.api.connector.sink2;
 
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.annotation.PublicEvolving;
 
-/** Allows expert users to implement a custom topology before {@link 
SinkWriter}. */
-@Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
+import java.io.IOException;
+import java.util.Collection;
 
+/** A {@link SinkWriter} that performs the first part of a two-phase commit 
protocol. */
+@PublicEvolving
+public interface CommittingSinkWriter<InputT, CommittableT> extends 
SinkWriter<InputT> {
     /**
-     * Adds an arbitrary topology before the writer. The topology may be used 
to repartition the
-     * data.
+     * Prepares for a commit.
      *
-     * @param inputDataStream the stream of input records.
-     * @return the custom topology before {@link SinkWriter}.
+     * <p>This method will be called after {@link #flush(boolean)} and before 
{@link
+     * StatefulSinkWriter#snapshotState(long)}.
+     *
+     * @return The data to commit as the second step of the two-phase commit 
protocol.
+     * @throws IOException if fail to prepare for a commit.
      */
-    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
+    Collection<CommittableT> prepareCommit() throws IOException, 
InterruptedException;
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
index bc769ddcd06..f5522c497d1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.connector.sink2;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -30,12 +31,13 @@ import org.apache.flink.util.UserCodeClassLoader;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.function.Consumer;
 
 /**
  * Base interface for developing a sink. A basic {@link Sink} is a stateless 
sink that can flush
  * data on checkpoint to achieve at-least-once consistency. Sinks with 
additional requirements
- * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
+ * should implement {@link SupportsWriterState} or {@link SupportsCommitter}.
  *
  * <p>The {@link Sink} needs to be serializable. All configuration should be 
validated eagerly. The
  * respective sink writers are transient and will only be created in the 
subtasks on the
@@ -52,11 +54,30 @@ public interface Sink<InputT> extends Serializable {
      * @param context the runtime context.
      * @return A sink writer.
      * @throws IOException for any failure during creation.
+     * @deprecated Please implement {@link #createWriter(WriterInitContext)}. 
For backward
+     *     compatibility reasons - to keep {@link Sink} a functional interface 
- Flink did not
+     *     provide a default implementation. New {@link Sink} implementations 
should implement this
+     *     method, but it will not be used, and it will be removed in 1.20.0 
release. Do not use
+     *     {@link Override} annotation when implementing this method, to 
prevent compilation errors
+     *     when migrating to 1.20.x release.
      */
+    @Deprecated
     SinkWriter<InputT> createWriter(InitContext context) throws IOException;
 
+    /**
+     * Creates a {@link SinkWriter}.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    default SinkWriter<InputT> createWriter(WriterInitContext context) throws 
IOException {
+        return createWriter(new InitContextWrapper(context));
+    }
+
     /** The interface exposes some runtime info for creating a {@link 
SinkWriter}. */
     @PublicEvolving
+    @Deprecated
     interface InitContext extends 
org.apache.flink.api.connector.sink2.InitContext {
         /**
          * Gets the {@link UserCodeClassLoader} to load classes that are not 
in system's classpath,
@@ -110,4 +131,80 @@ public interface Sink<InputT> extends Serializable {
             return Optional.empty();
         }
     }
+
+    /**
+     * Class for wrapping a new {@link WriterInitContext} to an old {@link 
InitContext} until
+     * deprecation.
+     *
+     * @deprecated Internal, do not use it.
+     */
+    @Deprecated
+    class InitContextWrapper implements InitContext {
+        private final WriterInitContext wrapped;
+
+        InitContextWrapper(WriterInitContext wrapped) {
+            this.wrapped = wrapped;
+        }
+
+        @Override
+        public int getSubtaskId() {
+            return wrapped.getSubtaskId();
+        }
+
+        @Override
+        public int getNumberOfParallelSubtasks() {
+            return wrapped.getNumberOfParallelSubtasks();
+        }
+
+        @Override
+        public int getAttemptNumber() {
+            return wrapped.getAttemptNumber();
+        }
+
+        @Override
+        public OptionalLong getRestoredCheckpointId() {
+            return wrapped.getRestoredCheckpointId();
+        }
+
+        @Override
+        public JobID getJobId() {
+            return wrapped.getJobId();
+        }
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return wrapped.getUserCodeClassLoader();
+        }
+
+        @Override
+        public MailboxExecutor getMailboxExecutor() {
+            return wrapped.getMailboxExecutor();
+        }
+
+        @Override
+        public ProcessingTimeService getProcessingTimeService() {
+            return wrapped.getProcessingTimeService();
+        }
+
+        @Override
+        public SinkWriterMetricGroup metricGroup() {
+            return wrapped.metricGroup();
+        }
+
+        @Override
+        public SerializationSchema.InitializationContext
+                asSerializationSchemaInitializationContext() {
+            return wrapped.asSerializationSchemaInitializationContext();
+        }
+
+        @Override
+        public boolean isObjectReuseEnabled() {
+            return wrapped.isObjectReuseEnabled();
+        }
+
+        @Override
+        public <IN> TypeSerializer<IN> createInputSerializer() {
+            return wrapped.createInputSerializer();
+        }
+    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
index a1814669fbc..5a3772b0d9e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
@@ -19,11 +19,9 @@
 package org.apache.flink.api.connector.sink2;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 
 /**
  * A {@link Sink} with a stateful {@link SinkWriter}.
@@ -34,51 +32,46 @@ import java.util.List;
  *
  * @param <InputT> The type of the sink's input
  * @param <WriterStateT> The type of the sink writer's state
+ * @deprecated Please implement {@link Sink} and {@link SupportsWriterState} 
instead.
  */
 @PublicEvolving
-public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
+@Deprecated
+public interface StatefulSink<InputT, WriterStateT>
+        extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> {
 
     /**
-     * Create a {@link StatefulSinkWriter}.
+     * Create a {@link 
org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
+     * state.
      *
      * @param context the runtime context.
      * @return A sink writer.
      * @throws IOException for any failure during creation.
      */
-    StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) 
throws IOException;
+    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+            Sink.InitContext context, Collection<WriterStateT> recoveredState) 
throws IOException {
+        throw new UnsupportedOperationException(
+                "Deprecated, please use restoreWriter(WriterInitContext, 
Collection<WriterStateT>)");
+    }
 
     /**
-     * Create a {@link StatefulSinkWriter} from a recovered state.
+     * Create a {@link 
org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
+     * state.
      *
      * @param context the runtime context.
      * @return A sink writer.
      * @throws IOException for any failure during creation.
      */
-    StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
-            InitContext context, Collection<WriterStateT> recoveredState) 
throws IOException;
-
-    /**
-     * Any stateful sink needs to provide this state serializer and implement 
{@link
-     * StatefulSinkWriter#snapshotState(long)} properly. The respective state 
is used in {@link
-     * #restoreWriter(InitContext, Collection)} on recovery.
-     *
-     * @return the serializer of the writer's state type.
-     */
-    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
+    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+            WriterInitContext context, Collection<WriterStateT> 
recoveredState) throws IOException {
+        return restoreWriter(new InitContextWrapper(context), recoveredState);
+    }
 
     /**
      * A mix-in for {@link StatefulSink} that allows users to migrate from a 
sink with a compatible
      * state to this sink.
      */
     @PublicEvolving
-    interface WithCompatibleState {
-        /**
-         * A list of state names of sinks from which the state can be 
restored. For example, the new
-         * {@code FileSink} can resume from the state of an old {@code 
StreamingFileSink} as a
-         * drop-in replacement when resuming from a checkpoint/savepoint.
-         */
-        Collection<String> getCompatibleWriterStateNames();
-    }
+    interface WithCompatibleState extends 
SupportsWriterState.WithCompatibleState {}
 
     /**
      * A {@link SinkWriter} whose state needs to be checkpointed.
@@ -87,11 +80,6 @@ public interface StatefulSink<InputT, WriterStateT> extends 
Sink<InputT> {
      * @param <WriterStateT> The type of the writer's state
      */
     @PublicEvolving
-    interface StatefulSinkWriter<InputT, WriterStateT> extends 
SinkWriter<InputT> {
-        /**
-         * @return The writer's state.
-         * @throws IOException if fail to snapshot writer's state.
-         */
-        List<WriterStateT> snapshotState(long checkpointId) throws IOException;
-    }
+    interface StatefulSinkWriter<InputT, WriterStateT>
+            extends 
org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {}
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
similarity index 52%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
copy to 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
index 6a16b420439..2f0d82045e6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
@@ -16,23 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.connector.sink2;
+package org.apache.flink.api.connector.sink2;
 
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.annotation.PublicEvolving;
 
-/** Allows expert users to implement a custom topology before {@link 
SinkWriter}. */
-@Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
+import java.io.IOException;
+import java.util.List;
 
+/**
+ * A {@link SinkWriter} whose state needs to be checkpointed.
+ *
+ * @param <InputT> The type of the sink writer's input
+ * @param <WriterStateT> The type of the writer's state
+ */
+@PublicEvolving
+public interface StatefulSinkWriter<InputT, WriterStateT> extends 
SinkWriter<InputT> {
     /**
-     * Adds an arbitrary topology before the writer. The topology may be used 
to repartition the
-     * data.
-     *
-     * @param inputDataStream the stream of input records.
-     * @return the custom topology before {@link SinkWriter}.
+     * @return The writer's state.
+     * @throws IOException if fail to snapshot writer's state.
      */
-    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
+    List<WriterStateT> snapshotState(long checkpointId) throws IOException;
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
new file mode 100644
index 00000000000..12a714e8382
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A mixin interface for a {@link Sink} which supports exactly-once semantics 
using a two-phase
+ * commit protocol. The {@link Sink} consists of a {@link 
CommittingSinkWriter} that performs the
+ * precommits and a {@link Committer} that actually commits the data. To 
facilitate the separation
+ * the {@link CommittingSinkWriter} creates <i>committables</i> on checkpoint 
or end of input and
+ * the sends it to the {@link Committer}.
+ *
+ * <p>The {@link Sink} needs to be serializable. All configuration should be 
validated eagerly. The
+ * respective sink writers and committers are transient and will only be 
created in the subtasks on
+ * the taskmanagers.
+ *
+ * @param <CommittableT> The type of the committables.
+ */
+@PublicEvolving
+public interface SupportsCommitter<CommittableT> {
+
+    /**
+     * Creates a {@link Committer} that permanently makes the previously 
written data visible
+     * through {@link Committer#commit(Collection)}.
+     *
+     * @param context The context information for the committer initialization.
+     * @return A committer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    Committer<CommittableT> createCommitter(CommitterInitContext context) 
throws IOException;
+
+    /** Returns the serializer of the committable type. */
+    SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
similarity index 52%
copy from 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
copy to 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
index a1814669fbc..5ee49bcdaa1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
@@ -23,75 +23,51 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 
 /**
- * A {@link Sink} with a stateful {@link SinkWriter}.
+ * A mixin interface for a {@link Sink} which supports a stateful {@link 
StatefulSinkWriter}.
  *
- * <p>The {@link StatefulSink} needs to be serializable. All configuration 
should be validated
- * eagerly. The respective sink writers are transient and will only be created 
in the subtasks on
- * the taskmanagers.
+ * <p>The {@link Sink} needs to be serializable. All configuration should be 
validated eagerly. The
+ * respective sink writers are transient and will only be created in the 
subtasks on the
+ * taskmanagers.
  *
  * @param <InputT> The type of the sink's input
  * @param <WriterStateT> The type of the sink writer's state
  */
 @PublicEvolving
-public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
-
-    /**
-     * Create a {@link StatefulSinkWriter}.
-     *
-     * @param context the runtime context.
-     * @return A sink writer.
-     * @throws IOException for any failure during creation.
-     */
-    StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) 
throws IOException;
+public interface SupportsWriterState<InputT, WriterStateT> {
 
     /**
      * Create a {@link StatefulSinkWriter} from a recovered state.
      *
      * @param context the runtime context.
+     * @param recoveredState the state to recover from.
      * @return A sink writer.
      * @throws IOException for any failure during creation.
      */
     StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
-            InitContext context, Collection<WriterStateT> recoveredState) 
throws IOException;
+            WriterInitContext context, Collection<WriterStateT> 
recoveredState) throws IOException;
 
     /**
      * Any stateful sink needs to provide this state serializer and implement 
{@link
      * StatefulSinkWriter#snapshotState(long)} properly. The respective state 
is used in {@link
-     * #restoreWriter(InitContext, Collection)} on recovery.
+     * #restoreWriter(WriterInitContext, Collection)} on recovery.
      *
      * @return the serializer of the writer's state type.
      */
     SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
 
     /**
-     * A mix-in for {@link StatefulSink} that allows users to migrate from a 
sink with a compatible
-     * state to this sink.
+     * A mix-in for {@link SupportsWriterState} that allows users to migrate 
from a sink with a
+     * compatible state to this sink.
      */
     @PublicEvolving
     interface WithCompatibleState {
         /**
-         * A list of state names of sinks from which the state can be 
restored. For example, the new
-         * {@code FileSink} can resume from the state of an old {@code 
StreamingFileSink} as a
-         * drop-in replacement when resuming from a checkpoint/savepoint.
+         * A collection of state names of sinks from which the state can be 
restored. For example,
+         * the new {@code FileSink} can resume from the state of an old {@code 
StreamingFileSink} as
+         * a drop-in replacement when resuming from a checkpoint/savepoint.
          */
         Collection<String> getCompatibleWriterStateNames();
     }
-
-    /**
-     * A {@link SinkWriter} whose state needs to be checkpointed.
-     *
-     * @param <InputT> The type of the sink writer's input
-     * @param <WriterStateT> The type of the writer's state
-     */
-    @PublicEvolving
-    interface StatefulSinkWriter<InputT, WriterStateT> extends 
SinkWriter<InputT> {
-        /**
-         * @return The writer's state.
-         * @throws IOException if fail to snapshot writer's state.
-         */
-        List<WriterStateT> snapshotState(long checkpointId) throws IOException;
-    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
index b2cf15565fb..d5f10ec3320 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
@@ -19,9 +19,6 @@
 package org.apache.flink.api.connector.sink2;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -38,19 +35,12 @@ import java.util.Collection;
  *
  * @param <InputT> The type of the sink's input
  * @param <CommT> The type of the committables.
+ * @deprecated Please implement {@link Sink} {@link SupportsCommitter} instead.
  */
 @PublicEvolving
-public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
-
-    /**
-     * Creates a {@link PrecommittingSinkWriter} that creates committables on 
checkpoint or end of
-     * input.
-     *
-     * @param context the runtime context.
-     * @return A sink writer for the two-phase commit protocol.
-     * @throws IOException for any failure during creation.
-     */
-    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) 
throws IOException;
+@Deprecated
+public interface TwoPhaseCommittingSink<InputT, CommT>
+        extends Sink<InputT>, SupportsCommitter<CommT> {
 
     /**
      * Creates a {@link Committer} that permanently makes the previously 
written data visible
@@ -78,29 +68,8 @@ public interface TwoPhaseCommittingSink<InputT, CommT> 
extends Sink<InputT> {
         return createCommitter();
     }
 
-    /** Returns the serializer of the committable type. */
-    SimpleVersionedSerializer<CommT> getCommittableSerializer();
-
     /** A {@link SinkWriter} that performs the first part of a two-phase 
commit protocol. */
     @PublicEvolving
-    interface PrecommittingSinkWriter<InputT, CommT> extends 
SinkWriter<InputT> {
-        /**
-         * Prepares for a commit.
-         *
-         * <p>This method will be called after {@link #flush(boolean)} and 
before {@link
-         * StatefulSinkWriter#snapshotState(long)}.
-         *
-         * @return The data to commit as the second step of the two-phase 
commit protocol.
-         * @throws IOException if fail to prepare for a commit.
-         */
-        Collection<CommT> prepareCommit() throws IOException, 
InterruptedException;
-    }
-
-    /** The interface exposes some runtime info for creating a {@link 
Committer}. */
-    @PublicEvolving
-    interface CommitterInitContext extends 
org.apache.flink.api.connector.sink2.InitContext {
-
-        /** @return The metric group this committer belongs to. */
-        SinkCommitterMetricGroup metricGroup();
-    }
+    @Deprecated
+    interface PrecommittingSinkWriter<InputT, CommT> extends 
CommittingSinkWriter<InputT, CommT> {}
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
new file mode 100644
index 00000000000..38e2e38c318
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/** The interface exposes some runtime info for creating a {@link SinkWriter}. 
*/
+@PublicEvolving
+public interface WriterInitContext extends 
org.apache.flink.api.connector.sink2.InitContext {
+    /**
+     * Gets the {@link UserCodeClassLoader} to load classes that are not in 
system's classpath, but
+     * are part of the jar file of a user job.
+     *
+     * @see UserCodeClassLoader
+     */
+    UserCodeClassLoader getUserCodeClassLoader();
+
+    /**
+     * Returns the mailbox executor that allows to execute {@link Runnable}s 
inside the task thread
+     * in between record processing.
+     *
+     * <p>Note that this method should not be used per-record for performance 
reasons in the same
+     * way as records should not be sent to the external system individually. 
Rather, implementers
+     * are expected to batch records and only enqueue a single {@link 
Runnable} per batch to handle
+     * the result.
+     */
+    MailboxExecutor getMailboxExecutor();
+
+    /**
+     * Returns a {@link ProcessingTimeService} that can be used to get the 
current time and register
+     * timers.
+     */
+    ProcessingTimeService getProcessingTimeService();
+
+    /** @return The metric group this writer belongs to. */
+    SinkWriterMetricGroup metricGroup();
+
+    /** Provides a view on this context as a {@link 
SerializationSchema.InitializationContext}. */
+    SerializationSchema.InitializationContext 
asSerializationSchemaInitializationContext();
+
+    /** Returns whether object reuse has been enabled or disabled. */
+    boolean isObjectReuseEnabled();
+
+    /** Creates a serializer for the type of sink's input. */
+    <IN> TypeSerializer<IN> createInputSerializer();
+
+    /**
+     * Returns a metadata consumer, the {@link SinkWriter} can publish 
metadata events of type
+     * {@link MetaT} to the consumer.
+     *
+     * <p>It is recommended to use a separate thread pool to publish the 
metadata because enqueuing
+     * a lot of these messages in the mailbox may lead to a performance 
decrease. thread, and the
+     * {@link Consumer#accept} method is executed very fast.
+     */
+    @Experimental
+    default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
index baaa8714fff..7171a5168a7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
@@ -84,4 +84,14 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
     public int getNumberOfFailedCommittables() {
         return numberOfFailedCommittables;
     }
+
+    public <NewCommT> CommittableSummary<NewCommT> map() {
+        return new CommittableSummary<>(
+                subtaskId,
+                numberOfSubtasks,
+                checkpointId,
+                numberOfCommittables,
+                numberOfPendingCommittables,
+                numberOfFailedCommittables);
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index 1dddcc79256..a792a3ad48c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import javax.annotation.Nullable;
 
 import java.util.OptionalLong;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,4 +56,8 @@ public class CommittableWithLineage<CommT> implements 
CommittableMessage<CommT>
     public OptionalLong getCheckpointId() {
         return checkpointId == null ? OptionalLong.empty() : 
OptionalLong.of(checkpointId);
     }
+
+    public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT, 
NewCommT> mapper) {
+        return new CommittableWithLineage<>(mapper.apply(committable), 
checkpointId, subtaskId);
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
similarity index 90%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
index 17d1c685841..dc89423a824 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
 /**
@@ -30,8 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
  * unexpected side-effects.
  */
 @Experimental
-public interface WithPostCommitTopology<InputT, CommT>
-        extends TwoPhaseCommittingSink<InputT, CommT> {
+public interface SupportsPostCommitTopology<CommittableT> {
 
     /**
      * Adds a custom post-commit topology where all committables can be 
processed.
@@ -45,5 +43,5 @@ public interface WithPostCommitTopology<InputT, CommT>
      *
      * @param committables the stream of committables.
      */
-    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> 
committables);
+    void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>> 
committables);
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
similarity index 80%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
index 6d7219d7d9d..67f277b1b45 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.connector.sink2;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
 /**
@@ -32,8 +32,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
  * unexpected side-effects.
  */
 @Experimental
-public interface WithPreCommitTopology<InputT, CommT>
-        extends TwoPhaseCommittingSink<InputT, CommT> {
+public interface SupportsPreCommitTopology<WriterResultT, CommittableT> {
 
     /**
      * Intercepts and modifies the committables sent on checkpoint or at end 
of input. Implementers
@@ -42,6 +41,9 @@ public interface WithPreCommitTopology<InputT, CommT>
      * @param committables the stream of committables.
      * @return the custom topology before {@link Committer}.
      */
-    DataStream<CommittableMessage<CommT>> addPreCommitTopology(
-            DataStream<CommittableMessage<CommT>> committables);
+    DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
+            DataStream<CommittableMessage<WriterResultT>> committables);
+
+    /** Returns the serializer of the WriteResult type. */
+    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
similarity index 92%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
index 6a16b420439..3e84b1ef4b9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
@@ -19,13 +19,12 @@
 package org.apache.flink.streaming.api.connector.sink2;
 
 import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
 /** Allows expert users to implement a custom topology before {@link 
SinkWriter}. */
 @Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
+public interface SupportsPreWriteTopology<InputT> {
 
     /**
      * Adds an arbitrary topology before the writer. The topology may be used 
to repartition the
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
index 17d1c685841..8fb516be550 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
@@ -21,29 +21,18 @@ package org.apache.flink.streaming.api.connector.sink2;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.streaming.api.datastream.DataStream;
 
 /**
  * Allows expert users to implement a custom topology after {@link Committer}.
  *
  * <p>It is recommended to use immutable committables because mutating 
committables can have
  * unexpected side-effects.
+ *
+ * @deprecated Please implement {@link 
org.apache.flink.api.connector.sink2.Sink}, {@link
+ *     org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link
+ *     SupportsPostCommitTopology} instead.
  */
 @Experimental
+@Deprecated
 public interface WithPostCommitTopology<InputT, CommT>
-        extends TwoPhaseCommittingSink<InputT, CommT> {
-
-    /**
-     * Adds a custom post-commit topology where all committables can be 
processed.
-     *
-     * <p>It is strongly recommended to keep this pipeline stateless such that 
batch and streaming
-     * modes do not require special cases.
-     *
-     * <p>All operations need to be idempotent: on recovery, any number of 
committables may be
-     * replayed that have already been committed. It's mandatory that these 
committables have no
-     * effect on the external system.
-     *
-     * @param committables the stream of committables.
-     */
-    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> 
committables);
-}
+        extends TwoPhaseCommittingSink<InputT, CommT>, 
SupportsPostCommitTopology<CommT> {}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
index 6d7219d7d9d..88f4a007e79 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 /**
  * Allows expert users to implement a custom topology after {@link SinkWriter} 
and before {@link
@@ -30,18 +30,17 @@ import org.apache.flink.streaming.api.datastream.DataStream;
  *
  * <p>It is recommended to use immutable committables because mutating 
committables can have
  * unexpected side-effects.
+ *
+ * @deprecated Please implement {@link 
org.apache.flink.api.connector.sink2.Sink}, {@link
+ *     org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link 
SupportsPreCommitTopology}
+ *     instead.
  */
 @Experimental
+@Deprecated
 public interface WithPreCommitTopology<InputT, CommT>
-        extends TwoPhaseCommittingSink<InputT, CommT> {
-
-    /**
-     * Intercepts and modifies the committables sent on checkpoint or at end 
of input. Implementers
-     * need to ensure to modify all {@link CommittableMessage}s appropriately.
-     *
-     * @param committables the stream of committables.
-     * @return the custom topology before {@link Committer}.
-     */
-    DataStream<CommittableMessage<CommT>> addPreCommitTopology(
-            DataStream<CommittableMessage<CommT>> committables);
+        extends TwoPhaseCommittingSink<InputT, CommT>, 
SupportsPreCommitTopology<CommT, CommT> {
+    /** Defaults to {@link #getCommittableSerializer} for backward 
compatibility. */
+    default SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
+        return getCommittableSerializer();
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
index 6a16b420439..dccd892cced 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
@@ -21,18 +21,13 @@ package org.apache.flink.streaming.api.connector.sink2;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
 
-/** Allows expert users to implement a custom topology before {@link 
SinkWriter}. */
+/**
+ * Allows expert users to implement a custom topology before {@link 
SinkWriter}.
+ *
+ * @deprecated Please implement {@link Sink} and {@link 
SupportsPreWriteTopology} instead.
+ */
 @Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
-
-    /**
-     * Adds an arbitrary topology before the writer. The topology may be used 
to repartition the
-     * data.
-     *
-     * @param inputDataStream the stream of input records.
-     * @return the custom topology before {@link SinkWriter}.
-     */
-    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
-}
+@Deprecated
+public interface WithPreWriteTopology<InputT>
+        extends Sink<InputT>, SupportsPreWriteTopology<InputT> {}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
index 66104b0c6b4..478898ff19b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.sink.GlobalCommitter;
 import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index cb5044be232..028d8317d80 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import 
org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
@@ -49,7 +50,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.OptionalLong;
 
-import static 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.CommitterInitContext;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 499e868f8c4..c0a9892d5ee 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
@@ -132,7 +133,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        InitContext initContext = 
createInitContext(context.getRestoredCheckpointId());
+        WriterInitContext initContext = 
createInitContext(context.getRestoredCheckpointId());
         if (context.isRestored()) {
             if (committableSerializer != null) {
                 final ListState<List<CommT>> legacyCommitterState =
@@ -239,7 +240,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         }
     }
 
-    private InitContext createInitContext(OptionalLong restoredCheckpointId) {
+    private WriterInitContext createInitContext(OptionalLong 
restoredCheckpointId) {
         return new InitContextImpl(
                 getRuntimeContext(),
                 processingTimeService,
@@ -268,7 +269,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         }
     }
 
-    private static class InitContextImpl extends InitContextBase implements 
InitContext {
+    private static class InitContextImpl extends InitContextBase implements 
WriterInitContext {
 
         private final ProcessingTimeService processingTimeService;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
index 0caf7bdb64b..6babf8582fd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 
 /**
@@ -38,6 +38,6 @@ interface SinkWriterStateHandler<InputT> {
     void snapshotState(long checkpointId) throws Exception;
 
     /** Creates a writer, potentially using state from {@link 
StateInitializationContext}. */
-    SinkWriter<InputT> createWriter(InitContext initContext, 
StateInitializationContext context)
-            throws Exception;
+    SinkWriter<InputT> createWriter(
+            WriterInitContext initContext, StateInitializationContext context) 
throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
index b2b5a5d5108..5847425fbca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
@@ -22,15 +22,18 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import 
org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
 
@@ -59,7 +62,7 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
      */
     private final Collection<String> previousSinkStateNames;
 
-    private final StatefulSink<InputT, WriterStateT> sink;
+    private final Sink<InputT> sink;
 
     // ------------------------------- runtime fields 
---------------------------------------
 
@@ -88,7 +91,7 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
 
     @Override
     public SinkWriter<InputT> createWriter(
-            InitContext initContext, StateInitializationContext context) 
throws Exception {
+            WriterInitContext initContext, StateInitializationContext context) 
throws Exception {
         final ListState<byte[]> rawState =
                 
context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC);
         writerState =
@@ -112,9 +115,14 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
                 previousSinkStates.add(previousSinkState);
                 Iterables.addAll(states, previousSinkState.get());
             }
-            sinkWriter = sink.restoreWriter(initContext, states);
+
+            if (!(sink instanceof SupportsWriterState)) {
+                throw new IllegalArgumentException("Sink should implement 
SupportsWriterState");
+            }
+
+            sinkWriter = ((SupportsWriterState) 
sink).restoreWriter(initContext, states);
         } else {
-            sinkWriter = sink.createWriter(initContext);
+            sinkWriter = cast(sink.createWriter(initContext));
         }
         return sinkWriter;
     }
@@ -124,4 +132,11 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
         writerState.update(sinkWriter.snapshotState(checkpointId));
         previousSinkStates.forEach(ListState::clear);
     }
+
+    private static StatefulSinkWriter cast(SinkWriter writer) {
+        Preconditions.checkArgument(
+                writer instanceof StatefulSinkWriter,
+                "The writer should implement StatefulSinkWriter");
+        return (StatefulSinkWriter) writer;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
index f49da996224..f8f79da727d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -35,7 +35,7 @@ final class StatelessSinkWriterStateHandler<InputT> 
implements SinkWriterStateHa
 
     @Override
     public SinkWriter<InputT> createWriter(
-            InitContext initContext, StateInitializationContext context) 
throws Exception {
+            WriterInitContext initContext, StateInitializationContext context) 
throws Exception {
         return sink.createWriter(initContext);
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 34d3b2dfeb5..fec52f6fa11 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
index 5f26d045960..af00546d09a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
index c6f9613c1c5..44a86e81b94 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
 import org.apache.flink.api.connector.source.Boundedness;

Reply via email to