This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80fd59ccfc35dc3f6a077c170a26d3252c9e4dee
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Thu Oct 12 21:17:49 2023 +0200

    [FLINK-28050][connectors] Deprecate OutputTypeConfigurable in 
flink-streaming-java
    
    Reason for this change: OutputTypeConfigurable needs to be used in 
DataGeneratorSource (see 
StreamExecutionEnvironmentTest#testFromElementsPostConstructionType). 
OutputTypeConfigurable is located in StreamExecutionEnvironment. When 
DataGeneratorSource gets added into flink-streaming-java, this creates a cycle. 
Marker interfaces should ideally reside in flink-core (InputTypeConfigurable 
already does).
    - Deprecates OutputTypeConfigurable in flink-streaming-java
    - Adds a new marker OutputTypeConfigurable interface to flink-core
    - Modifies operators that use this interface to support both versions
---
 .../java/typeutils}/OutputTypeConfigurable.java    | 12 ++++++------
 .../api/operators/OutputTypeConfigurable.java      |  3 +++
 .../api/operators/SimpleOperatorFactory.java       | 13 +++++++++++--
 .../api/operators/SourceOperatorFactory.java       | 22 ++++++++++++++++++++++
 4 files changed, 42 insertions(+), 8 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java
similarity index 86%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
copy to 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java
index a296fa7d5cb..2ff3ceabcd6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.operators;
+package org.apache.flink.api.java.typeutils;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  * Stream operators can implement this interface if they need access to the 
output type information
- * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
+ * at {@code org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
  * cases where the output type is specified by the returns method and, thus, 
after the stream
  * operator has been created.
  */
@@ -32,13 +32,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 public interface OutputTypeConfigurable<OUT> {
 
     /**
-     * Is called by the {@link 
org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer,
-     * String, StreamOperator, TypeInformation, TypeInformation, String)} 
method when the {@link
+     * Is called by the {@code 
org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer,
+     * String, StreamOperator, TypeInformation, TypeInformation, String)} 
method when the {@code
      * org.apache.flink.streaming.api.graph.StreamGraph} is generated. The 
method is called with the
-     * output {@link TypeInformation} which is also used for the {@link
+     * output {@link TypeInformation} which is also used for the {@code
      * org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer.
      *
-     * @param outTypeInfo Output type information of the {@link
+     * @param outTypeInfo Output type information of the {@code
      *     org.apache.flink.streaming.runtime.tasks.StreamTask}
      * @param executionConfig Execution configuration
      */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
index a296fa7d5cb..f358aea0c92 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
@@ -27,7 +27,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
  * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
  * cases where the output type is specified by the returns method and, thus, 
after the stream
  * operator has been created.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead
  */
+@Deprecated
 @PublicEvolving
 public interface OutputTypeConfigurable<OUT> {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
index 95329baaef4..fbb734eb07e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 
@@ -109,13 +110,21 @@ public class SimpleOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
 
     @Override
     public boolean isOutputTypeConfigurable() {
-        return operator instanceof OutputTypeConfigurable;
+        return operator instanceof OutputTypeConfigurable
+                || operator // legacy kept for compatibility purposes
+                        instanceof 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void setOutputType(TypeInformation<OUT> type, ExecutionConfig 
executionConfig) {
-        ((OutputTypeConfigurable<OUT>) operator).setOutputType(type, 
executionConfig);
+        if (operator instanceof OutputTypeConfigurable) {
+            ((OutputTypeConfigurable<OUT>) operator).setOutputType(type, 
executionConfig);
+        } else if (operator // legacy kept for compatibility purposes
+                instanceof 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable) {
+            
((org.apache.flink.streaming.api.operators.OutputTypeConfigurable<OUT>) 
operator)
+                    .setOutputType(type, executionConfig);
+        }
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 479c5224365..247cb456d88 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -18,12 +18,15 @@ limitations under the License.
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -153,6 +156,25 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
         return true;
     }
 
+    @Override
+    public boolean isOutputTypeConfigurable() {
+        return source instanceof OutputTypeConfigurable
+                || source // legacy kept for compatibility purposes
+                        instanceof 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setOutputType(TypeInformation<OUT> type, ExecutionConfig 
executionConfig) {
+        if (source instanceof OutputTypeConfigurable) {
+            ((OutputTypeConfigurable<OUT>) source).setOutputType(type, 
executionConfig);
+        } else if (source // legacy kept for compatibility purposes
+                instanceof 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable) {
+            
((org.apache.flink.streaming.api.operators.OutputTypeConfigurable<OUT>) source)
+                    .setOutputType(type, executionConfig);
+        }
+    }
+
     /**
      * This is a utility method to conjure up a "SplitT" generics variable 
binding so that we can
      * construct the SourceOperator without resorting to "all raw types". That 
way, this methods

Reply via email to