This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6afe98daf61 [FLINK-33973] Add new interfaces for SinkV2 to synchronize the API with the SourceV2 API 6afe98daf61 is described below commit 6afe98daf6190a77a67a1fa8ac8f12337a75f8e7 Author: pvary <peter.vary.apa...@gmail.com> AuthorDate: Fri Jan 12 10:16:00 2024 +0100 [FLINK-33973] Add new interfaces for SinkV2 to synchronize the API with the SourceV2 API --- .../api/connector/sink2/CommitterInitContext.java | 25 ++---- .../api/connector/sink2/CommittingSinkWriter.java | 27 +++--- .../org/apache/flink/api/connector/sink2/Sink.java | 99 +++++++++++++++++++++- .../flink/api/connector/sink2/StatefulSink.java | 52 +++++------- .../api/connector/sink2/StatefulSinkWriter.java | 29 ++++--- .../api/connector/sink2/SupportsCommitter.java | 55 ++++++++++++ ...{StatefulSink.java => SupportsWriterState.java} | 50 +++-------- .../connector/sink2/TwoPhaseCommittingSink.java | 43 ++-------- .../api/connector/sink2/WriterInitContext.java | 85 +++++++++++++++++++ .../api/connector/sink2/CommittableSummary.java | 10 +++ .../connector/sink2/CommittableWithLineage.java | 5 ++ ...pology.java => SupportsPostCommitTopology.java} | 6 +- ...opology.java => SupportsPreCommitTopology.java} | 12 +-- ...Topology.java => SupportsPreWriteTopology.java} | 3 +- .../connector/sink2/WithPostCommitTopology.java | 23 ++--- .../api/connector/sink2/WithPreCommitTopology.java | 23 +++-- .../api/connector/sink2/WithPreWriteTopology.java | 21 ++--- .../api/transformations/SinkV1Adapter.java | 1 + .../runtime/operators/sink/CommitterOperator.java | 2 +- .../runtime/operators/sink/SinkWriterOperator.java | 7 +- .../operators/sink/SinkWriterStateHandler.java | 6 +- .../sink/StatefulSinkWriterStateHandler.java | 29 +++++-- .../sink/StatelessSinkWriterStateHandler.java | 4 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 1 + .../flink/streaming/util/TestExpandingSink.java | 1 + .../scheduling/SpeculativeSchedulerITCase.java | 1 + 26 files changed, 400 insertions(+), 220 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java similarity index 51% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java copy to flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java index 6a16b420439..d44865a6491 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java @@ -16,23 +16,14 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.api.connector.sink2; -import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; -/** Allows expert users to implement a custom topology before {@link SinkWriter}. */ -@Experimental -public interface WithPreWriteTopology<InputT> extends Sink<InputT> { - - /** - * Adds an arbitrary topology before the writer. The topology may be used to repartition the - * data. - * - * @param inputDataStream the stream of input records. - * @return the custom topology before {@link SinkWriter}. - */ - DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream); +/** The interface exposes some runtime info for creating a {@link Committer}. */ +@PublicEvolving +public interface CommitterInitContext extends InitContext { + /** @return The metric group this committer belongs to. */ + SinkCommitterMetricGroup metricGroup(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java similarity index 52% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java copy to flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java index 6a16b420439..980bcb32ef1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java @@ -16,23 +16,24 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.api.connector.sink2; -import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.annotation.PublicEvolving; -/** Allows expert users to implement a custom topology before {@link SinkWriter}. */ -@Experimental -public interface WithPreWriteTopology<InputT> extends Sink<InputT> { +import java.io.IOException; +import java.util.Collection; +/** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */ +@PublicEvolving +public interface CommittingSinkWriter<InputT, CommittableT> extends SinkWriter<InputT> { /** - * Adds an arbitrary topology before the writer. The topology may be used to repartition the - * data. + * Prepares for a commit. * - * @param inputDataStream the stream of input records. - * @return the custom topology before {@link SinkWriter}. + * <p>This method will be called after {@link #flush(boolean)} and before {@link + * StatefulSinkWriter#snapshotState(long)}. + * + * @return The data to commit as the second step of the two-phase commit protocol. + * @throws IOException if fail to prepare for a commit. */ - DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream); + Collection<CommittableT> prepareCommit() throws IOException, InterruptedException; } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java index bc769ddcd06..f5522c497d1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java @@ -20,6 +20,7 @@ package org.apache.flink.api.connector.sink2; import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -30,12 +31,13 @@ import org.apache.flink.util.UserCodeClassLoader; import java.io.IOException; import java.io.Serializable; import java.util.Optional; +import java.util.OptionalLong; import java.util.function.Consumer; /** * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush * data on checkpoint to achieve at-least-once consistency. Sinks with additional requirements - * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}. + * should implement {@link SupportsWriterState} or {@link SupportsCommitter}. * * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The * respective sink writers are transient and will only be created in the subtasks on the @@ -52,11 +54,30 @@ public interface Sink<InputT> extends Serializable { * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. + * @deprecated Please implement {@link #createWriter(WriterInitContext)}. For backward + * compatibility reasons - to keep {@link Sink} a functional interface - Flink did not + * provide a default implementation. New {@link Sink} implementations should implement this + * method, but it will not be used, and it will be removed in 1.20.0 release. Do not use + * {@link Override} annotation when implementing this method, to prevent compilation errors + * when migrating to 1.20.x release. */ + @Deprecated SinkWriter<InputT> createWriter(InitContext context) throws IOException; + /** + * Creates a {@link SinkWriter}. + * + * @param context the runtime context. + * @return A sink writer. + * @throws IOException for any failure during creation. + */ + default SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException { + return createWriter(new InitContextWrapper(context)); + } + /** The interface exposes some runtime info for creating a {@link SinkWriter}. */ @PublicEvolving + @Deprecated interface InitContext extends org.apache.flink.api.connector.sink2.InitContext { /** * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, @@ -110,4 +131,80 @@ public interface Sink<InputT> extends Serializable { return Optional.empty(); } } + + /** + * Class for wrapping a new {@link WriterInitContext} to an old {@link InitContext} until + * deprecation. + * + * @deprecated Internal, do not use it. + */ + @Deprecated + class InitContextWrapper implements InitContext { + private final WriterInitContext wrapped; + + InitContextWrapper(WriterInitContext wrapped) { + this.wrapped = wrapped; + } + + @Override + public int getSubtaskId() { + return wrapped.getSubtaskId(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return wrapped.getNumberOfParallelSubtasks(); + } + + @Override + public int getAttemptNumber() { + return wrapped.getAttemptNumber(); + } + + @Override + public OptionalLong getRestoredCheckpointId() { + return wrapped.getRestoredCheckpointId(); + } + + @Override + public JobID getJobId() { + return wrapped.getJobId(); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return wrapped.getUserCodeClassLoader(); + } + + @Override + public MailboxExecutor getMailboxExecutor() { + return wrapped.getMailboxExecutor(); + } + + @Override + public ProcessingTimeService getProcessingTimeService() { + return wrapped.getProcessingTimeService(); + } + + @Override + public SinkWriterMetricGroup metricGroup() { + return wrapped.metricGroup(); + } + + @Override + public SerializationSchema.InitializationContext + asSerializationSchemaInitializationContext() { + return wrapped.asSerializationSchemaInitializationContext(); + } + + @Override + public boolean isObjectReuseEnabled() { + return wrapped.isObjectReuseEnabled(); + } + + @Override + public <IN> TypeSerializer<IN> createInputSerializer() { + return wrapped.createInputSerializer(); + } + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java index a1814669fbc..5a3772b0d9e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java @@ -19,11 +19,9 @@ package org.apache.flink.api.connector.sink2; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; import java.util.Collection; -import java.util.List; /** * A {@link Sink} with a stateful {@link SinkWriter}. @@ -34,51 +32,46 @@ import java.util.List; * * @param <InputT> The type of the sink's input * @param <WriterStateT> The type of the sink writer's state + * @deprecated Please implement {@link Sink} and {@link SupportsWriterState} instead. */ @PublicEvolving -public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> { +@Deprecated +public interface StatefulSink<InputT, WriterStateT> + extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> { /** - * Create a {@link StatefulSinkWriter}. + * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered + * state. * * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. */ - StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) throws IOException; + default StatefulSinkWriter<InputT, WriterStateT> restoreWriter( + Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException { + throw new UnsupportedOperationException( + "Deprecated, please use restoreWriter(WriterInitContext, Collection<WriterStateT>)"); + } /** - * Create a {@link StatefulSinkWriter} from a recovered state. + * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered + * state. * * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. */ - StatefulSinkWriter<InputT, WriterStateT> restoreWriter( - InitContext context, Collection<WriterStateT> recoveredState) throws IOException; - - /** - * Any stateful sink needs to provide this state serializer and implement {@link - * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link - * #restoreWriter(InitContext, Collection)} on recovery. - * - * @return the serializer of the writer's state type. - */ - SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer(); + default StatefulSinkWriter<InputT, WriterStateT> restoreWriter( + WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException { + return restoreWriter(new InitContextWrapper(context), recoveredState); + } /** * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible * state to this sink. */ @PublicEvolving - interface WithCompatibleState { - /** - * A list of state names of sinks from which the state can be restored. For example, the new - * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a - * drop-in replacement when resuming from a checkpoint/savepoint. - */ - Collection<String> getCompatibleWriterStateNames(); - } + interface WithCompatibleState extends SupportsWriterState.WithCompatibleState {} /** * A {@link SinkWriter} whose state needs to be checkpointed. @@ -87,11 +80,6 @@ public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> { * @param <WriterStateT> The type of the writer's state */ @PublicEvolving - interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> { - /** - * @return The writer's state. - * @throws IOException if fail to snapshot writer's state. - */ - List<WriterStateT> snapshotState(long checkpointId) throws IOException; - } + interface StatefulSinkWriter<InputT, WriterStateT> + extends org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {} } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java similarity index 52% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java copy to flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java index 6a16b420439..2f0d82045e6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java @@ -16,23 +16,24 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.api.connector.sink2; -import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.annotation.PublicEvolving; -/** Allows expert users to implement a custom topology before {@link SinkWriter}. */ -@Experimental -public interface WithPreWriteTopology<InputT> extends Sink<InputT> { +import java.io.IOException; +import java.util.List; +/** + * A {@link SinkWriter} whose state needs to be checkpointed. + * + * @param <InputT> The type of the sink writer's input + * @param <WriterStateT> The type of the writer's state + */ +@PublicEvolving +public interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> { /** - * Adds an arbitrary topology before the writer. The topology may be used to repartition the - * data. - * - * @param inputDataStream the stream of input records. - * @return the custom topology before {@link SinkWriter}. + * @return The writer's state. + * @throws IOException if fail to snapshot writer's state. */ - DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream); + List<WriterStateT> snapshotState(long checkpointId) throws IOException; } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java new file mode 100644 index 00000000000..12a714e8382 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; +import java.util.Collection; + +/** + * A mixin interface for a {@link Sink} which supports exactly-once semantics using a two-phase + * commit protocol. The {@link Sink} consists of a {@link CommittingSinkWriter} that performs the + * precommits and a {@link Committer} that actually commits the data. To facilitate the separation + * the {@link CommittingSinkWriter} creates <i>committables</i> on checkpoint or end of input and + * the sends it to the {@link Committer}. + * + * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The + * respective sink writers and committers are transient and will only be created in the subtasks on + * the taskmanagers. + * + * @param <CommittableT> The type of the committables. + */ +@PublicEvolving +public interface SupportsCommitter<CommittableT> { + + /** + * Creates a {@link Committer} that permanently makes the previously written data visible + * through {@link Committer#commit(Collection)}. + * + * @param context The context information for the committer initialization. + * @return A committer for the two-phase commit protocol. + * @throws IOException for any failure during creation. + */ + Committer<CommittableT> createCommitter(CommitterInitContext context) throws IOException; + + /** Returns the serializer of the committable type. */ + SimpleVersionedSerializer<CommittableT> getCommittableSerializer(); +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java similarity index 52% copy from flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java copy to flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java index a1814669fbc..5ee49bcdaa1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java @@ -23,75 +23,51 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; import java.util.Collection; -import java.util.List; /** - * A {@link Sink} with a stateful {@link SinkWriter}. + * A mixin interface for a {@link Sink} which supports a stateful {@link StatefulSinkWriter}. * - * <p>The {@link StatefulSink} needs to be serializable. All configuration should be validated - * eagerly. The respective sink writers are transient and will only be created in the subtasks on - * the taskmanagers. + * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The + * respective sink writers are transient and will only be created in the subtasks on the + * taskmanagers. * * @param <InputT> The type of the sink's input * @param <WriterStateT> The type of the sink writer's state */ @PublicEvolving -public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> { - - /** - * Create a {@link StatefulSinkWriter}. - * - * @param context the runtime context. - * @return A sink writer. - * @throws IOException for any failure during creation. - */ - StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) throws IOException; +public interface SupportsWriterState<InputT, WriterStateT> { /** * Create a {@link StatefulSinkWriter} from a recovered state. * * @param context the runtime context. + * @param recoveredState the state to recover from. * @return A sink writer. * @throws IOException for any failure during creation. */ StatefulSinkWriter<InputT, WriterStateT> restoreWriter( - InitContext context, Collection<WriterStateT> recoveredState) throws IOException; + WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException; /** * Any stateful sink needs to provide this state serializer and implement {@link * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link - * #restoreWriter(InitContext, Collection)} on recovery. + * #restoreWriter(WriterInitContext, Collection)} on recovery. * * @return the serializer of the writer's state type. */ SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer(); /** - * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible - * state to this sink. + * A mix-in for {@link SupportsWriterState} that allows users to migrate from a sink with a + * compatible state to this sink. */ @PublicEvolving interface WithCompatibleState { /** - * A list of state names of sinks from which the state can be restored. For example, the new - * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a - * drop-in replacement when resuming from a checkpoint/savepoint. + * A collection of state names of sinks from which the state can be restored. For example, + * the new {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as + * a drop-in replacement when resuming from a checkpoint/savepoint. */ Collection<String> getCompatibleWriterStateNames(); } - - /** - * A {@link SinkWriter} whose state needs to be checkpointed. - * - * @param <InputT> The type of the sink writer's input - * @param <WriterStateT> The type of the writer's state - */ - @PublicEvolving - interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> { - /** - * @return The writer's state. - * @throws IOException if fail to snapshot writer's state. - */ - List<WriterStateT> snapshotState(long checkpointId) throws IOException; - } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java index b2cf15565fb..d5f10ec3320 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java @@ -19,9 +19,6 @@ package org.apache.flink.api.connector.sink2; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter; -import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import java.io.IOException; import java.util.Collection; @@ -38,19 +35,12 @@ import java.util.Collection; * * @param <InputT> The type of the sink's input * @param <CommT> The type of the committables. + * @deprecated Please implement {@link Sink} {@link SupportsCommitter} instead. */ @PublicEvolving -public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> { - - /** - * Creates a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of - * input. - * - * @param context the runtime context. - * @return A sink writer for the two-phase commit protocol. - * @throws IOException for any failure during creation. - */ - PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException; +@Deprecated +public interface TwoPhaseCommittingSink<InputT, CommT> + extends Sink<InputT>, SupportsCommitter<CommT> { /** * Creates a {@link Committer} that permanently makes the previously written data visible @@ -78,29 +68,8 @@ public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> { return createCommitter(); } - /** Returns the serializer of the committable type. */ - SimpleVersionedSerializer<CommT> getCommittableSerializer(); - /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */ @PublicEvolving - interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> { - /** - * Prepares for a commit. - * - * <p>This method will be called after {@link #flush(boolean)} and before {@link - * StatefulSinkWriter#snapshotState(long)}. - * - * @return The data to commit as the second step of the two-phase commit protocol. - * @throws IOException if fail to prepare for a commit. - */ - Collection<CommT> prepareCommit() throws IOException, InterruptedException; - } - - /** The interface exposes some runtime info for creating a {@link Committer}. */ - @PublicEvolving - interface CommitterInitContext extends org.apache.flink.api.connector.sink2.InitContext { - - /** @return The metric group this committer belongs to. */ - SinkCommitterMetricGroup metricGroup(); - } + @Deprecated + interface PrecommittingSinkWriter<InputT, CommT> extends CommittingSinkWriter<InputT, CommT> {} } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java new file mode 100644 index 00000000000..38e2e38c318 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +import java.util.Optional; +import java.util.function.Consumer; + +/** The interface exposes some runtime info for creating a {@link SinkWriter}. */ +@PublicEvolving +public interface WriterInitContext extends org.apache.flink.api.connector.sink2.InitContext { + /** + * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, but + * are part of the jar file of a user job. + * + * @see UserCodeClassLoader + */ + UserCodeClassLoader getUserCodeClassLoader(); + + /** + * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task thread + * in between record processing. + * + * <p>Note that this method should not be used per-record for performance reasons in the same + * way as records should not be sent to the external system individually. Rather, implementers + * are expected to batch records and only enqueue a single {@link Runnable} per batch to handle + * the result. + */ + MailboxExecutor getMailboxExecutor(); + + /** + * Returns a {@link ProcessingTimeService} that can be used to get the current time and register + * timers. + */ + ProcessingTimeService getProcessingTimeService(); + + /** @return The metric group this writer belongs to. */ + SinkWriterMetricGroup metricGroup(); + + /** Provides a view on this context as a {@link SerializationSchema.InitializationContext}. */ + SerializationSchema.InitializationContext asSerializationSchemaInitializationContext(); + + /** Returns whether object reuse has been enabled or disabled. */ + boolean isObjectReuseEnabled(); + + /** Creates a serializer for the type of sink's input. */ + <IN> TypeSerializer<IN> createInputSerializer(); + + /** + * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type + * {@link MetaT} to the consumer. + * + * <p>It is recommended to use a separate thread pool to publish the metadata because enqueuing + * a lot of these messages in the mailbox may lead to a performance decrease. thread, and the + * {@link Consumer#accept} method is executed very fast. + */ + @Experimental + default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() { + return Optional.empty(); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java index baaa8714fff..7171a5168a7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java @@ -84,4 +84,14 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> { public int getNumberOfFailedCommittables() { return numberOfFailedCommittables; } + + public <NewCommT> CommittableSummary<NewCommT> map() { + return new CommittableSummary<>( + subtaskId, + numberOfSubtasks, + checkpointId, + numberOfCommittables, + numberOfPendingCommittables, + numberOfFailedCommittables); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java index 1dddcc79256..a792a3ad48c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java @@ -25,6 +25,7 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import javax.annotation.Nullable; import java.util.OptionalLong; +import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -55,4 +56,8 @@ public class CommittableWithLineage<CommT> implements CommittableMessage<CommT> public OptionalLong getCheckpointId() { return checkpointId == null ? OptionalLong.empty() : OptionalLong.of(checkpointId); } + + public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT, NewCommT> mapper) { + return new CommittableWithLineage<>(mapper.apply(committable), checkpointId, subtaskId); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java similarity index 90% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java index 17d1c685841..dc89423a824 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.streaming.api.datastream.DataStream; /** @@ -30,8 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; * unexpected side-effects. */ @Experimental -public interface WithPostCommitTopology<InputT, CommT> - extends TwoPhaseCommittingSink<InputT, CommT> { +public interface SupportsPostCommitTopology<CommittableT> { /** * Adds a custom post-commit topology where all committables can be processed. @@ -45,5 +43,5 @@ public interface WithPostCommitTopology<InputT, CommT> * * @param committables the stream of committables. */ - void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables); + void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>> committables); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java similarity index 80% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java index 6d7219d7d9d..67f277b1b45 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java @@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.datastream.DataStream; /** @@ -32,8 +32,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; * unexpected side-effects. */ @Experimental -public interface WithPreCommitTopology<InputT, CommT> - extends TwoPhaseCommittingSink<InputT, CommT> { +public interface SupportsPreCommitTopology<WriterResultT, CommittableT> { /** * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers @@ -42,6 +41,9 @@ public interface WithPreCommitTopology<InputT, CommT> * @param committables the stream of committables. * @return the custom topology before {@link Committer}. */ - DataStream<CommittableMessage<CommT>> addPreCommitTopology( - DataStream<CommittableMessage<CommT>> committables); + DataStream<CommittableMessage<CommittableT>> addPreCommitTopology( + DataStream<CommittableMessage<WriterResultT>> committables); + + /** Returns the serializer of the WriteResult type. */ + SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java similarity index 92% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java index 6a16b420439..3e84b1ef4b9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java @@ -19,13 +19,12 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.streaming.api.datastream.DataStream; /** Allows expert users to implement a custom topology before {@link SinkWriter}. */ @Experimental -public interface WithPreWriteTopology<InputT> extends Sink<InputT> { +public interface SupportsPreWriteTopology<InputT> { /** * Adds an arbitrary topology before the writer. The topology may be used to repartition the diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java index 17d1c685841..8fb516be550 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java @@ -21,29 +21,18 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; -import org.apache.flink.streaming.api.datastream.DataStream; /** * Allows expert users to implement a custom topology after {@link Committer}. * * <p>It is recommended to use immutable committables because mutating committables can have * unexpected side-effects. + * + * @deprecated Please implement {@link org.apache.flink.api.connector.sink2.Sink}, {@link + * org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link + * SupportsPostCommitTopology} instead. */ @Experimental +@Deprecated public interface WithPostCommitTopology<InputT, CommT> - extends TwoPhaseCommittingSink<InputT, CommT> { - - /** - * Adds a custom post-commit topology where all committables can be processed. - * - * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming - * modes do not require special cases. - * - * <p>All operations need to be idempotent: on recovery, any number of committables may be - * replayed that have already been committed. It's mandatory that these committables have no - * effect on the external system. - * - * @param committables the stream of committables. - */ - void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables); -} + extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPostCommitTopology<CommT> {} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java index 6d7219d7d9d..88f4a007e79 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java @@ -22,7 +22,7 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; -import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.core.io.SimpleVersionedSerializer; /** * Allows expert users to implement a custom topology after {@link SinkWriter} and before {@link @@ -30,18 +30,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; * * <p>It is recommended to use immutable committables because mutating committables can have * unexpected side-effects. + * + * @deprecated Please implement {@link org.apache.flink.api.connector.sink2.Sink}, {@link + * org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link SupportsPreCommitTopology} + * instead. */ @Experimental +@Deprecated public interface WithPreCommitTopology<InputT, CommT> - extends TwoPhaseCommittingSink<InputT, CommT> { - - /** - * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers - * need to ensure to modify all {@link CommittableMessage}s appropriately. - * - * @param committables the stream of committables. - * @return the custom topology before {@link Committer}. - */ - DataStream<CommittableMessage<CommT>> addPreCommitTopology( - DataStream<CommittableMessage<CommT>> committables); + extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPreCommitTopology<CommT, CommT> { + /** Defaults to {@link #getCommittableSerializer} for backward compatibility. */ + default SimpleVersionedSerializer<CommT> getWriteResultSerializer() { + return getCommittableSerializer(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java index 6a16b420439..dccd892cced 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java @@ -21,18 +21,13 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.streaming.api.datastream.DataStream; -/** Allows expert users to implement a custom topology before {@link SinkWriter}. */ +/** + * Allows expert users to implement a custom topology before {@link SinkWriter}. + * + * @deprecated Please implement {@link Sink} and {@link SupportsPreWriteTopology} instead. + */ @Experimental -public interface WithPreWriteTopology<InputT> extends Sink<InputT> { - - /** - * Adds an arbitrary topology before the writer. The topology may be used to repartition the - * data. - * - * @param inputDataStream the stream of input records. - * @return the custom topology before {@link SinkWriter}. - */ - DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream); -} +@Deprecated +public interface WithPreWriteTopology<InputT> + extends Sink<InputT>, SupportsPreWriteTopology<InputT> {} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java index 66104b0c6b4..478898ff19b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java @@ -27,6 +27,7 @@ import org.apache.flink.api.connector.sink.GlobalCommitter; import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService; import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index cb5044be232..028d8317d80 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; @@ -49,7 +50,6 @@ import java.util.Collection; import java.util.Collections; import java.util.OptionalLong; -import static org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.CommitterInitContext; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 499e868f8c4..c0a9892d5ee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; @@ -132,7 +133,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - InitContext initContext = createInitContext(context.getRestoredCheckpointId()); + WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId()); if (context.isRestored()) { if (committableSerializer != null) { final ListState<List<CommT>> legacyCommitterState = @@ -239,7 +240,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab } } - private InitContext createInitContext(OptionalLong restoredCheckpointId) { + private WriterInitContext createInitContext(OptionalLong restoredCheckpointId) { return new InitContextImpl( getRuntimeContext(), processingTimeService, @@ -268,7 +269,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab } } - private static class InitContextImpl extends InitContextBase implements InitContext { + private static class InitContextImpl extends InitContextBase implements WriterInitContext { private final ProcessingTimeService processingTimeService; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java index 0caf7bdb64b..6babf8582fd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java @@ -17,8 +17,8 @@ package org.apache.flink.streaming.runtime.operators.sink; -import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.runtime.state.StateInitializationContext; /** @@ -38,6 +38,6 @@ interface SinkWriterStateHandler<InputT> { void snapshotState(long checkpointId) throws Exception; /** Creates a writer, potentially using state from {@link StateInitializationContext}. */ - SinkWriter<InputT> createWriter(InitContext initContext, StateInitializationContext context) - throws Exception; + SinkWriter<InputT> createWriter( + WriterInitContext initContext, StateInitializationContext context) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java index b2b5a5d5108..5847425fbca 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java @@ -22,15 +22,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.StatefulSink; -import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter; -import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState; +import org.apache.flink.api.connector.sink2.StatefulSinkWriter; +import org.apache.flink.api.connector.sink2.SupportsWriterState; +import org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables; @@ -59,7 +62,7 @@ final class StatefulSinkWriterStateHandler<InputT, WriterStateT> */ private final Collection<String> previousSinkStateNames; - private final StatefulSink<InputT, WriterStateT> sink; + private final Sink<InputT> sink; // ------------------------------- runtime fields --------------------------------------- @@ -88,7 +91,7 @@ final class StatefulSinkWriterStateHandler<InputT, WriterStateT> @Override public SinkWriter<InputT> createWriter( - InitContext initContext, StateInitializationContext context) throws Exception { + WriterInitContext initContext, StateInitializationContext context) throws Exception { final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC); writerState = @@ -112,9 +115,14 @@ final class StatefulSinkWriterStateHandler<InputT, WriterStateT> previousSinkStates.add(previousSinkState); Iterables.addAll(states, previousSinkState.get()); } - sinkWriter = sink.restoreWriter(initContext, states); + + if (!(sink instanceof SupportsWriterState)) { + throw new IllegalArgumentException("Sink should implement SupportsWriterState"); + } + + sinkWriter = ((SupportsWriterState) sink).restoreWriter(initContext, states); } else { - sinkWriter = sink.createWriter(initContext); + sinkWriter = cast(sink.createWriter(initContext)); } return sinkWriter; } @@ -124,4 +132,11 @@ final class StatefulSinkWriterStateHandler<InputT, WriterStateT> writerState.update(sinkWriter.snapshotState(checkpointId)); previousSinkStates.forEach(ListState::clear); } + + private static StatefulSinkWriter cast(SinkWriter writer) { + Preconditions.checkArgument( + writer instanceof StatefulSinkWriter, + "The writer should implement StatefulSinkWriter"); + return (StatefulSinkWriter) writer; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java index f49da996224..f8f79da727d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.runtime.state.StateInitializationContext; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -35,7 +35,7 @@ final class StatelessSinkWriterStateHandler<InputT> implements SinkWriterStateHa @Override public SinkWriter<InputT> createWriter( - InitContext initContext, StateInitializationContext context) throws Exception { + WriterInitContext initContext, StateInitializationContext context) throws Exception { return sink.createWriter(initContext); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 34d3b2dfeb5..fec52f6fa11 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -35,6 +35,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.lib.NumberSequenceSource; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java index 5f26d045960..af00546d09a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java index c6f9613c1c5..44a86e81b94 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; import org.apache.flink.api.connector.source.Boundedness;