This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLINK-18808-record-out
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49a320688c46e9f1992b1486e1dff08be876e6ac
Author: Weijie Guo <res...@163.com>
AuthorDate: Fri May 5 15:06:54 2023 +0800

    [FLINK-18808][streaming] Introduce helper builder class for 
StreamConfigChainer.
---
 .../runtime/tasks/StreamConfigChainer.java         | 242 +++++++++++++++++++--
 1 file changed, 218 insertions(+), 24 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 100340a6ac5..7411a3e2fd9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
@@ -36,13 +37,17 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.util.OutputTag;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -58,6 +63,10 @@ public class StreamConfigChainer<OWNER> {
     private StreamConfig tailConfig;
     private int chainIndex = MAIN_NODE_ID;
 
+    private final List<List<NonChainedOutput>> outEdgesInOrder = new 
LinkedList<>();
+
+    private boolean setTailNonChainedOutputs = true;
+
     StreamConfigChainer(
             OperatorID headOperatorID,
             StreamConfig headConfig,
@@ -77,6 +86,11 @@ public class StreamConfigChainer<OWNER> {
         headConfig.setChainIndex(chainIndex);
     }
 
+    /**
+     * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+     *     TypeSerializer)} instead.
+     */
+    @Deprecated
     public <T> StreamConfigChainer<OWNER> chain(
             OperatorID operatorID,
             OneInputStreamOperator<T, T> operator,
@@ -85,11 +99,21 @@ public class StreamConfigChainer<OWNER> {
         return chain(operatorID, operator, typeSerializer, typeSerializer, 
createKeyedStateBackend);
     }
 
+    /**
+     * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+     *     TypeSerializer)} instead.
+     */
+    @Deprecated
     public <T> StreamConfigChainer<OWNER> chain(
             OneInputStreamOperator<T, T> operator, TypeSerializer<T> 
typeSerializer) {
         return chain(new OperatorID(), operator, typeSerializer);
     }
 
+    /**
+     * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+     *     TypeSerializer)} instead.
+     */
+    @Deprecated
     public <T> StreamConfigChainer<OWNER> chain(
             OperatorID operatorID,
             OneInputStreamOperator<T, T> operator,
@@ -97,11 +121,21 @@ public class StreamConfigChainer<OWNER> {
         return chain(operatorID, operator, typeSerializer, typeSerializer, 
false);
     }
 
+    /**
+     * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+     *     TypeSerializer)} instead.
+     */
+    @Deprecated
     public <T> StreamConfigChainer<OWNER> chain(
             OneInputStreamOperatorFactory<T, T> operatorFactory, 
TypeSerializer<T> typeSerializer) {
         return chain(new OperatorID(), operatorFactory, typeSerializer);
     }
 
+    /**
+     * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+     *     TypeSerializer)} instead.
+     */
+    @Deprecated
     public <T> StreamConfigChainer<OWNER> chain(
             OperatorID operatorID,
             OneInputStreamOperatorFactory<T, T> operatorFactory,
@@ -109,6 +143,11 @@ public class StreamConfigChainer<OWNER> {
         return chain(operatorID, operatorFactory, typeSerializer, 
typeSerializer, false);
     }
 
+    /**
+     * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+     *     TypeSerializer)} instead.
+     */
+    @Deprecated
     private <IN, OUT> StreamConfigChainer<OWNER> chain(
             OperatorID operatorID,
             OneInputStreamOperator<IN, OUT> operator,
@@ -123,6 +162,11 @@ public class StreamConfigChainer<OWNER> {
                 createKeyedStateBackend);
     }
 
+    /**
+     * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+     *     TypeSerializer)} instead.
+     */
+    @Deprecated
     public <IN, OUT> StreamConfigChainer<OWNER> chain(
             OperatorID operatorID,
             StreamOperatorFactory<OUT> operatorFactory,
@@ -168,36 +212,47 @@ public class StreamConfigChainer<OWNER> {
         return this;
     }
 
-    public OWNER finish() {
-        checkState(chainIndex > 0, "Use finishForSingletonOperatorChain");
-        List<NonChainedOutput> outEdgesInOrder = new LinkedList<>();
+    public <T> StreamConfigEdgeChainer<OWNER, T, T> chain(TypeSerializer<T> 
typeSerializer) {
+        return chain(typeSerializer, typeSerializer);
+    }
 
-        StreamNode sourceVertex =
-                new StreamNode(chainIndex, null, null, (StreamOperator<?>) 
null, null, null);
-        for (int i = 0; i < numberOfNonChainedOutputs; ++i) {
-            NonChainedOutput streamOutput =
-                    new NonChainedOutput(
-                            true,
-                            sourceVertex.getId(),
-                            1,
-                            1,
-                            100,
-                            false,
-                            new IntermediateDataSetID(),
-                            null,
-                            new BroadcastPartitioner<>(),
-                            ResultPartitionType.PIPELINED_BOUNDED);
-            outEdgesInOrder.add(streamOutput);
+    public <IN, OUT> StreamConfigEdgeChainer<OWNER, IN, OUT> chain(
+            TypeSerializer<IN> inputSerializer, TypeSerializer<OUT> 
outputSerializer) {
+        return new StreamConfigEdgeChainer<>(this, inputSerializer, 
outputSerializer);
+    }
+
+    public OWNER finish() {
+        if (setTailNonChainedOutputs) {
+            List<NonChainedOutput> nonChainedOutputs = new ArrayList<>();
+            for (int i = 0; i < numberOfNonChainedOutputs; ++i) {
+                NonChainedOutput streamOutput =
+                        new NonChainedOutput(
+                                true,
+                                chainIndex,
+                                1,
+                                1,
+                                100,
+                                false,
+                                new IntermediateDataSetID(),
+                                null,
+                                new BroadcastPartitioner<>(),
+                                ResultPartitionType.PIPELINED_BOUNDED);
+                nonChainedOutputs.add(streamOutput);
+            }
+            outEdgesInOrder.add(nonChainedOutputs);
+            tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs);
+            tailConfig.setVertexNonChainedOutputs(nonChainedOutputs);
+            tailConfig.setOperatorNonChainedOutputs(nonChainedOutputs);
         }
 
+        Collections.reverse(outEdgesInOrder);
+        List<NonChainedOutput> allOutEdgesInOrder =
+                
outEdgesInOrder.stream().flatMap(List::stream).collect(Collectors.toList());
+
         tailConfig.setChainEnd();
-        tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs);
-        tailConfig.setVertexNonChainedOutputs(outEdgesInOrder);
-        tailConfig.setOperatorNonChainedOutputs(outEdgesInOrder);
         chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
-
         headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
-        headConfig.setVertexNonChainedOutputs(outEdgesInOrder);
+        headConfig.setVertexNonChainedOutputs(allOutEdgesInOrder);
         headConfig.serializeAllConfigs();
 
         return owner;
@@ -262,4 +317,143 @@ public class StreamConfigChainer<OWNER> {
     public void setBufferTimeout(int bufferTimeout) {
         this.bufferTimeout = bufferTimeout;
     }
+
+    /** Helper class to build operator node. */
+    public static class StreamConfigEdgeChainer<OWNER, IN, OUT> {
+        private final OutputTag<Void> placeHolderTag;
+        private StreamConfigChainer<OWNER> parent;
+        private OperatorID operatorID;
+
+        private final TypeSerializer<IN> inputSerializer;
+        private final TypeSerializer<OUT> outputSerializer;
+
+        private StreamOperatorFactory<OUT> operatorFactory;
+
+        private Map<OutputTag<?>, Integer> nonChainedOutPuts;
+        private boolean createKeyedStateBackend;
+
+        private StreamConfigEdgeChainer(
+                StreamConfigChainer<OWNER> parent,
+                TypeSerializer<IN> inputSerializer,
+                TypeSerializer<OUT> outputSerializer) {
+            this.parent = parent;
+            this.parent.setTailNonChainedOutputs = true;
+
+            this.inputSerializer = inputSerializer;
+            this.outputSerializer = outputSerializer;
+            this.placeHolderTag =
+                    new OutputTag<>("FLINK_PLACEHOLDER", 
BasicTypeInfo.VOID_TYPE_INFO);
+            this.nonChainedOutPuts = new HashMap<>(4);
+        }
+
+        public StreamConfigEdgeChainer<OWNER, IN, OUT> 
setOperatorID(OperatorID operatorID) {
+            this.operatorID = operatorID;
+            return this;
+        }
+
+        public StreamConfigEdgeChainer<OWNER, IN, OUT> setOperatorFactory(
+                StreamOperatorFactory operatorFactory) {
+            this.operatorFactory = operatorFactory;
+            return this;
+        }
+
+        public StreamConfigEdgeChainer<OWNER, IN, OUT> 
addNonChainedOutputsCount(
+                int nonChainedOutputsCount) {
+            return addNonChainedOutputsCount(placeHolderTag, 
nonChainedOutputsCount);
+        }
+
+        public StreamConfigEdgeChainer<OWNER, IN, OUT> 
addNonChainedOutputsCount(
+                OutputTag<?> outputTag, int nonChainedOutputsCount) {
+            checkArgument(nonChainedOutputsCount >= 0 && outputTag != null);
+            this.nonChainedOutPuts.put(outputTag, nonChainedOutputsCount);
+            return this;
+        }
+
+        public StreamConfigEdgeChainer<OWNER, IN, OUT> 
setCreateKeyedStateBackend(
+                boolean createKeyedStateBackend) {
+            this.createKeyedStateBackend = createKeyedStateBackend;
+            return this;
+        }
+
+        public StreamConfigChainer<OWNER> build() {
+            parent.chainIndex++;
+
+            StreamEdge streamEdge =
+                    new StreamEdge(
+                            new StreamNode(
+                                    parent.tailConfig.getChainIndex(),
+                                    null,
+                                    null,
+                                    (StreamOperator<?>) null,
+                                    null,
+                                    null),
+                            new StreamNode(
+                                    parent.chainIndex,
+                                    null,
+                                    null,
+                                    (StreamOperator<?>) null,
+                                    null,
+                                    null),
+                            0,
+                            null,
+                            null);
+            streamEdge.setBufferTimeout(parent.bufferTimeout);
+            
parent.tailConfig.setChainedOutputs(Collections.singletonList(streamEdge));
+            parent.tailConfig = new StreamConfig(new Configuration());
+            
parent.tailConfig.setStreamOperatorFactory(checkNotNull(operatorFactory));
+            parent.tailConfig.setOperatorID(operatorID == null ? new 
OperatorID() : operatorID);
+            parent.tailConfig.setupNetworkInputs(inputSerializer);
+            parent.tailConfig.setTypeSerializerOut(outputSerializer);
+            if (createKeyedStateBackend) {
+                // used to test multiple stateful operators chained in a 
single task.
+                parent.tailConfig.setStateKeySerializer(inputSerializer);
+                parent.tailConfig.setStateBackendUsesManagedMemory(true);
+                parent.tailConfig.setManagedMemoryFractionOperatorOfUseCase(
+                        ManagedMemoryUseCase.STATE_BACKEND, 1.0);
+            }
+            if (!nonChainedOutPuts.isEmpty()) {
+                List<NonChainedOutput> nonChainedOutputs =
+                        createNonChainedOutputs(nonChainedOutPuts, streamEdge);
+
+                
parent.tailConfig.setVertexNonChainedOutputs(nonChainedOutputs);
+                
parent.tailConfig.setOperatorNonChainedOutputs(nonChainedOutputs);
+                
parent.chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
+                parent.tailConfig.setNumberOfOutputs(nonChainedOutputs.size());
+                parent.outEdgesInOrder.add(nonChainedOutputs);
+                parent.setTailNonChainedOutputs = false;
+            }
+            parent.tailConfig.setChainIndex(parent.chainIndex);
+            parent.tailConfig.serializeAllConfigs();
+
+            parent.chainedConfigs.put(parent.chainIndex, parent.tailConfig);
+            return parent;
+        }
+
+        private List<NonChainedOutput> createNonChainedOutputs(
+                Map<OutputTag<?>, Integer> nonChainedOutputsCount, StreamEdge 
streamEdge) {
+            List<NonChainedOutput> nonChainedOutputs = new ArrayList<>();
+            nonChainedOutputsCount.forEach(
+                    (outputTag, value) -> {
+                        for (int i = 0; i < value; i++) {
+                            nonChainedOutputs.add(
+                                    new NonChainedOutput(
+                                            true,
+                                            streamEdge.getTargetId(),
+                                            1,
+                                            1,
+                                            100,
+                                            false,
+                                            new IntermediateDataSetID(),
+                                            placeHolderTag.equals(outputTag) ? 
null : outputTag,
+                                            new BroadcastPartitioner<>(),
+                                            
ResultPartitionType.PIPELINED_BOUNDED));
+                            if (!placeHolderTag.equals(outputTag)) {
+                                parent.tailConfig.setTypeSerializerSideOut(
+                                        outputTag, outputSerializer);
+                            }
+                        }
+                    });
+            return nonChainedOutputs;
+        }
+    }
 }

Reply via email to