This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a76854f535f4af4251b038c5c87bfb73e36fdf0f Author: Stephan Ewen <se...@apache.org> AuthorDate: Mon Apr 19 18:22:07 2021 +0200 [FLINK-22358][connector base] Add stability annotations to connector base and iterator sources. --- .../org/apache/flink/connector/base/source/reader/RecordEmitter.java | 2 ++ .../apache/flink/connector/base/source/reader/RecordsBySplits.java | 2 ++ .../flink/connector/base/source/reader/RecordsWithSplitIds.java | 3 +++ .../base/source/reader/SingleThreadMultiplexSourceReaderBase.java | 2 ++ .../apache/flink/connector/base/source/reader/SourceReaderBase.java | 2 ++ .../flink/connector/base/source/reader/SourceReaderOptions.java | 4 +++- .../flink/connector/base/source/reader/fetcher/AddSplitsTask.java | 2 ++ .../apache/flink/connector/base/source/reader/fetcher/FetchTask.java | 2 ++ .../base/source/reader/fetcher/SingleThreadFetcherManager.java | 2 ++ .../flink/connector/base/source/reader/fetcher/SplitFetcher.java | 2 ++ .../connector/base/source/reader/fetcher/SplitFetcherManager.java | 2 ++ .../flink/connector/base/source/reader/fetcher/SplitFetcherTask.java | 3 +++ .../flink/connector/base/source/reader/splitreader/SplitReader.java | 2 ++ .../connector/base/source/reader/splitreader/SplitsAddition.java | 3 +++ .../flink/connector/base/source/reader/splitreader/SplitsChange.java | 3 +++ .../source/reader/synchronization/FutureCompletingBlockingQueue.java | 2 ++ .../java/org/apache/flink/connector/base/source/utils/SerdeUtils.java | 2 ++ 17 files changed, 39 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java index 99b40ae..9c2772f 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -28,6 +29,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; * @param <T> the type of records that are eventually emitted to the {@link SourceOutput}. * @param <SplitStateT> the mutable type of split state. */ +@PublicEvolving public interface RecordEmitter<E, T, SplitStateT> { /** diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java index e5e18dc..293de7c 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.SourceSplit; import javax.annotation.Nullable; @@ -34,6 +35,7 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; /** An implementation of RecordsWithSplitIds to host all the records by splits. */ +@PublicEvolving public class RecordsBySplits<E> implements RecordsWithSplitIds<E> { private final Set<String> finishedSplits; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java index 88174fd..5233da0 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java @@ -18,11 +18,14 @@ package org.apache.flink.connector.base.source.reader; +import org.apache.flink.annotation.PublicEvolving; + import javax.annotation.Nullable; import java.util.Set; /** An interface for the elements passed from the fetchers to the source reader. */ +@PublicEvolving public interface RecordsWithSplitIds<E> { /** diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java index 377a72d..0a6baf6 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; @@ -56,6 +57,7 @@ import java.util.function.Supplier; * @param <SplitT> The type of the splits processed by the source. * @param <SplitStateT> The type of the mutable state per split. */ +@PublicEvolving public abstract class SingleThreadMultiplexSourceReaderBase< E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 696e21d..416dbb2 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceOutput; @@ -63,6 +64,7 @@ import static org.apache.flink.util.Preconditions.checkState; * @param <SplitT> the immutable split type. * @param <SplitStateT> the mutable type of split state. */ +@PublicEvolving public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> implements SourceReader<T, SplitT> { private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java index e0d436a..641acf0 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java @@ -18,11 +18,13 @@ package org.apache.flink.connector.base.source.reader; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; -/** The options tht can be set for the {@link SourceReaderBase}. */ +/** The options that can be set for the {@link SourceReaderBase}. */ +@PublicEvolving public class SourceReaderOptions { public static final ConfigOption<Long> SOURCE_READER_CLOSE_TIMEOUT = diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java index fade54f..97a072b 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader.fetcher; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; @@ -26,6 +27,7 @@ import java.util.List; import java.util.Map; /** The task to add splits. */ +@Internal class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask { private final SplitReader<?, SplitT> splitReader; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java index 38140b8..34ac618 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader.fetcher; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -28,6 +29,7 @@ import java.util.Collection; import java.util.function.Consumer; /** The default fetch task that fetches the records into the element queue. */ +@Internal class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { private final SplitReader<E, SplitT> splitReader; private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java index 91d0d4d..19ffb8b 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader.fetcher; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SourceReaderBase; @@ -36,6 +37,7 @@ import java.util.function.Supplier; * via the same client. In the example of the file source, there is a single thread that reads the * files after another. */ +@Internal public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> extends SplitFetcherManager<E, SplitT> { diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 3b1af0f..0b18a01 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader.fetcher; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; /** The internal fetcher runnable responsible for polling message from the external system. */ +@Internal public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class); private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK"); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index 894efa3..91fdd89 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader.fetcher; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -49,6 +50,7 @@ import java.util.function.Supplier; * manager would only start a single fetcher and assign all the splits to it. A one-thread-per-split * fetcher may spawn a new thread every time a new split is assigned. */ +@Internal public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { private static final Logger LOG = LoggerFactory.getLogger(SplitFetcherManager.class); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java index ceb5ec1..62a21d3 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java @@ -18,9 +18,12 @@ package org.apache.flink.connector.base.source.reader.fetcher; +import org.apache.flink.annotation.Internal; + import java.io.IOException; /** An interface similar to {@link Runnable} but allows throwing exceptions and wakeup. */ +@Internal public interface SplitFetcherTask { /** diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java index 97b2125..4f2ff6a 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader.splitreader; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -30,6 +31,7 @@ import java.io.IOException; * @param <E> the element type. * @param <SplitT> the split type. */ +@PublicEvolving public interface SplitReader<E, SplitT extends SourceSplit> { /** diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java index b26d564..ccc8e19 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.base.source.reader.splitreader; +import org.apache.flink.annotation.PublicEvolving; + import java.util.List; /** @@ -25,6 +27,7 @@ import java.util.List; * * @param <SplitT> the split type. */ +@PublicEvolving public class SplitsAddition<SplitT> extends SplitsChange<SplitT> { public SplitsAddition(List<SplitT> splits) { diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java index 4c89a4f..ce9f05f 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java @@ -18,10 +18,13 @@ package org.apache.flink.connector.base.source.reader.splitreader; +import org.apache.flink.annotation.PublicEvolving; + import java.util.Collections; import java.util.List; /** An abstract class to host splits change. */ +@PublicEvolving public abstract class SplitsChange<SplitT> { private final List<SplitT> splits; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java index bea5709..9977bcd 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader.synchronization; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.FlinkRuntimeException; @@ -69,6 +70,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * * @param <T> the type of the elements in the queue. */ +@Internal public class FutureCompletingBlockingQueue<T> { /** diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java index 2bf219d..6b2de99 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java @@ -18,6 +18,7 @@ limitations under the License. package org.apache.flink.connector.base.source.utils; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -32,6 +33,7 @@ import java.util.Map; import java.util.function.Function; /** A util class with some helper method for serde in the sources. */ +@Internal public class SerdeUtils { /** Private constructor for util class. */