This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 80fd59ccfc35dc3f6a077c170a26d3252c9e4dee Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Thu Oct 12 21:17:49 2023 +0200 [FLINK-28050][connectors] Deprecate OutputTypeConfigurable in flink-streaming-java Reason for this change: OutputTypeConfigurable needs to be used in DataGeneratorSource (see StreamExecutionEnvironmentTest#testFromElementsPostConstructionType). OutputTypeConfigurable is located in StreamExecutionEnvironment. When DataGeneratorSource gets added into flink-streaming-java, this creates a cycle. Marker interfaces should ideally reside in flink-core (InputTypeConfigurable already does). - Deprecates OutputTypeConfigurable in flink-streaming-java - Adds a new marker OutputTypeConfigurable interface to flink-core - Modifies operators that use this interface to support both versions --- .../java/typeutils}/OutputTypeConfigurable.java | 12 ++++++------ .../api/operators/OutputTypeConfigurable.java | 3 +++ .../api/operators/SimpleOperatorFactory.java | 13 +++++++++++-- .../api/operators/SourceOperatorFactory.java | 22 ++++++++++++++++++++++ 4 files changed, 42 insertions(+), 8 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java similarity index 86% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java copy to flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java index a296fa7d5cb..2ff3ceabcd6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.operators; +package org.apache.flink.api.java.typeutils; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; /** * Stream operators can implement this interface if they need access to the output type information - * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for + * at {@code org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for * cases where the output type is specified by the returns method and, thus, after the stream * operator has been created. */ @@ -32,13 +32,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; public interface OutputTypeConfigurable<OUT> { /** - * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, - * String, StreamOperator, TypeInformation, TypeInformation, String)} method when the {@link + * Is called by the {@code org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, + * String, StreamOperator, TypeInformation, TypeInformation, String)} method when the {@code * org.apache.flink.streaming.api.graph.StreamGraph} is generated. The method is called with the - * output {@link TypeInformation} which is also used for the {@link + * output {@link TypeInformation} which is also used for the {@code * org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer. * - * @param outTypeInfo Output type information of the {@link + * @param outTypeInfo Output type information of the {@code * org.apache.flink.streaming.runtime.tasks.StreamTask} * @param executionConfig Execution configuration */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java index a296fa7d5cb..f358aea0c92 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java @@ -27,7 +27,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for * cases where the output type is specified by the returns method and, thus, after the stream * operator has been created. + * + * @deprecated Use {@link org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead */ +@Deprecated @PublicEvolving public interface OutputTypeConfigurable<OUT> { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java index 95329baaef4..fbb734eb07e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; @@ -109,13 +110,21 @@ public class SimpleOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU @Override public boolean isOutputTypeConfigurable() { - return operator instanceof OutputTypeConfigurable; + return operator instanceof OutputTypeConfigurable + || operator // legacy kept for compatibility purposes + instanceof org.apache.flink.streaming.api.operators.OutputTypeConfigurable; } @SuppressWarnings("unchecked") @Override public void setOutputType(TypeInformation<OUT> type, ExecutionConfig executionConfig) { - ((OutputTypeConfigurable<OUT>) operator).setOutputType(type, executionConfig); + if (operator instanceof OutputTypeConfigurable) { + ((OutputTypeConfigurable<OUT>) operator).setOutputType(type, executionConfig); + } else if (operator // legacy kept for compatibility purposes + instanceof org.apache.flink.streaming.api.operators.OutputTypeConfigurable) { + ((org.apache.flink.streaming.api.operators.OutputTypeConfigurable<OUT>) operator) + .setOutputType(type, executionConfig); + } } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index 479c5224365..247cb456d88 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -18,12 +18,15 @@ limitations under the License. package org.apache.flink.streaming.api.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -153,6 +156,25 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU return true; } + @Override + public boolean isOutputTypeConfigurable() { + return source instanceof OutputTypeConfigurable + || source // legacy kept for compatibility purposes + instanceof org.apache.flink.streaming.api.operators.OutputTypeConfigurable; + } + + @Override + @SuppressWarnings("unchecked") + public void setOutputType(TypeInformation<OUT> type, ExecutionConfig executionConfig) { + if (source instanceof OutputTypeConfigurable) { + ((OutputTypeConfigurable<OUT>) source).setOutputType(type, executionConfig); + } else if (source // legacy kept for compatibility purposes + instanceof org.apache.flink.streaming.api.operators.OutputTypeConfigurable) { + ((org.apache.flink.streaming.api.operators.OutputTypeConfigurable<OUT>) source) + .setOutputType(type, executionConfig); + } + } + /** * This is a utility method to conjure up a "SplitT" generics variable binding so that we can * construct the SourceOperator without resorting to "all raw types". That way, this methods