This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLINK-18808-record-out in repository https://gitbox.apache.org/repos/asf/flink.git
commit 49a320688c46e9f1992b1486e1dff08be876e6ac Author: Weijie Guo <res...@163.com> AuthorDate: Fri May 5 15:06:54 2023 +0800 [FLINK-18808][streaming] Introduce helper builder class for StreamConfigChainer. --- .../runtime/tasks/StreamConfigChainer.java | 242 +++++++++++++++++++-- 1 file changed, 218 insertions(+), 24 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java index 100340a6ac5..7411a3e2fd9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.ManagedMemoryUseCase; @@ -36,13 +37,17 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.util.OutputTag; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -58,6 +63,10 @@ public class StreamConfigChainer<OWNER> { private StreamConfig tailConfig; private int chainIndex = MAIN_NODE_ID; + private final List<List<NonChainedOutput>> outEdgesInOrder = new LinkedList<>(); + + private boolean setTailNonChainedOutputs = true; + StreamConfigChainer( OperatorID headOperatorID, StreamConfig headConfig, @@ -77,6 +86,11 @@ public class StreamConfigChainer<OWNER> { headConfig.setChainIndex(chainIndex); } + /** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ + @Deprecated public <T> StreamConfigChainer<OWNER> chain( OperatorID operatorID, OneInputStreamOperator<T, T> operator, @@ -85,11 +99,21 @@ public class StreamConfigChainer<OWNER> { return chain(operatorID, operator, typeSerializer, typeSerializer, createKeyedStateBackend); } + /** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ + @Deprecated public <T> StreamConfigChainer<OWNER> chain( OneInputStreamOperator<T, T> operator, TypeSerializer<T> typeSerializer) { return chain(new OperatorID(), operator, typeSerializer); } + /** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ + @Deprecated public <T> StreamConfigChainer<OWNER> chain( OperatorID operatorID, OneInputStreamOperator<T, T> operator, @@ -97,11 +121,21 @@ public class StreamConfigChainer<OWNER> { return chain(operatorID, operator, typeSerializer, typeSerializer, false); } + /** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ + @Deprecated public <T> StreamConfigChainer<OWNER> chain( OneInputStreamOperatorFactory<T, T> operatorFactory, TypeSerializer<T> typeSerializer) { return chain(new OperatorID(), operatorFactory, typeSerializer); } + /** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ + @Deprecated public <T> StreamConfigChainer<OWNER> chain( OperatorID operatorID, OneInputStreamOperatorFactory<T, T> operatorFactory, @@ -109,6 +143,11 @@ public class StreamConfigChainer<OWNER> { return chain(operatorID, operatorFactory, typeSerializer, typeSerializer, false); } + /** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ + @Deprecated private <IN, OUT> StreamConfigChainer<OWNER> chain( OperatorID operatorID, OneInputStreamOperator<IN, OUT> operator, @@ -123,6 +162,11 @@ public class StreamConfigChainer<OWNER> { createKeyedStateBackend); } + /** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ + @Deprecated public <IN, OUT> StreamConfigChainer<OWNER> chain( OperatorID operatorID, StreamOperatorFactory<OUT> operatorFactory, @@ -168,36 +212,47 @@ public class StreamConfigChainer<OWNER> { return this; } - public OWNER finish() { - checkState(chainIndex > 0, "Use finishForSingletonOperatorChain"); - List<NonChainedOutput> outEdgesInOrder = new LinkedList<>(); + public <T> StreamConfigEdgeChainer<OWNER, T, T> chain(TypeSerializer<T> typeSerializer) { + return chain(typeSerializer, typeSerializer); + } - StreamNode sourceVertex = - new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null); - for (int i = 0; i < numberOfNonChainedOutputs; ++i) { - NonChainedOutput streamOutput = - new NonChainedOutput( - true, - sourceVertex.getId(), - 1, - 1, - 100, - false, - new IntermediateDataSetID(), - null, - new BroadcastPartitioner<>(), - ResultPartitionType.PIPELINED_BOUNDED); - outEdgesInOrder.add(streamOutput); + public <IN, OUT> StreamConfigEdgeChainer<OWNER, IN, OUT> chain( + TypeSerializer<IN> inputSerializer, TypeSerializer<OUT> outputSerializer) { + return new StreamConfigEdgeChainer<>(this, inputSerializer, outputSerializer); + } + + public OWNER finish() { + if (setTailNonChainedOutputs) { + List<NonChainedOutput> nonChainedOutputs = new ArrayList<>(); + for (int i = 0; i < numberOfNonChainedOutputs; ++i) { + NonChainedOutput streamOutput = + new NonChainedOutput( + true, + chainIndex, + 1, + 1, + 100, + false, + new IntermediateDataSetID(), + null, + new BroadcastPartitioner<>(), + ResultPartitionType.PIPELINED_BOUNDED); + nonChainedOutputs.add(streamOutput); + } + outEdgesInOrder.add(nonChainedOutputs); + tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs); + tailConfig.setVertexNonChainedOutputs(nonChainedOutputs); + tailConfig.setOperatorNonChainedOutputs(nonChainedOutputs); } + Collections.reverse(outEdgesInOrder); + List<NonChainedOutput> allOutEdgesInOrder = + outEdgesInOrder.stream().flatMap(List::stream).collect(Collectors.toList()); + tailConfig.setChainEnd(); - tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs); - tailConfig.setVertexNonChainedOutputs(outEdgesInOrder); - tailConfig.setOperatorNonChainedOutputs(outEdgesInOrder); chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs); - headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs); - headConfig.setVertexNonChainedOutputs(outEdgesInOrder); + headConfig.setVertexNonChainedOutputs(allOutEdgesInOrder); headConfig.serializeAllConfigs(); return owner; @@ -262,4 +317,143 @@ public class StreamConfigChainer<OWNER> { public void setBufferTimeout(int bufferTimeout) { this.bufferTimeout = bufferTimeout; } + + /** Helper class to build operator node. */ + public static class StreamConfigEdgeChainer<OWNER, IN, OUT> { + private final OutputTag<Void> placeHolderTag; + private StreamConfigChainer<OWNER> parent; + private OperatorID operatorID; + + private final TypeSerializer<IN> inputSerializer; + private final TypeSerializer<OUT> outputSerializer; + + private StreamOperatorFactory<OUT> operatorFactory; + + private Map<OutputTag<?>, Integer> nonChainedOutPuts; + private boolean createKeyedStateBackend; + + private StreamConfigEdgeChainer( + StreamConfigChainer<OWNER> parent, + TypeSerializer<IN> inputSerializer, + TypeSerializer<OUT> outputSerializer) { + this.parent = parent; + this.parent.setTailNonChainedOutputs = true; + + this.inputSerializer = inputSerializer; + this.outputSerializer = outputSerializer; + this.placeHolderTag = + new OutputTag<>("FLINK_PLACEHOLDER", BasicTypeInfo.VOID_TYPE_INFO); + this.nonChainedOutPuts = new HashMap<>(4); + } + + public StreamConfigEdgeChainer<OWNER, IN, OUT> setOperatorID(OperatorID operatorID) { + this.operatorID = operatorID; + return this; + } + + public StreamConfigEdgeChainer<OWNER, IN, OUT> setOperatorFactory( + StreamOperatorFactory operatorFactory) { + this.operatorFactory = operatorFactory; + return this; + } + + public StreamConfigEdgeChainer<OWNER, IN, OUT> addNonChainedOutputsCount( + int nonChainedOutputsCount) { + return addNonChainedOutputsCount(placeHolderTag, nonChainedOutputsCount); + } + + public StreamConfigEdgeChainer<OWNER, IN, OUT> addNonChainedOutputsCount( + OutputTag<?> outputTag, int nonChainedOutputsCount) { + checkArgument(nonChainedOutputsCount >= 0 && outputTag != null); + this.nonChainedOutPuts.put(outputTag, nonChainedOutputsCount); + return this; + } + + public StreamConfigEdgeChainer<OWNER, IN, OUT> setCreateKeyedStateBackend( + boolean createKeyedStateBackend) { + this.createKeyedStateBackend = createKeyedStateBackend; + return this; + } + + public StreamConfigChainer<OWNER> build() { + parent.chainIndex++; + + StreamEdge streamEdge = + new StreamEdge( + new StreamNode( + parent.tailConfig.getChainIndex(), + null, + null, + (StreamOperator<?>) null, + null, + null), + new StreamNode( + parent.chainIndex, + null, + null, + (StreamOperator<?>) null, + null, + null), + 0, + null, + null); + streamEdge.setBufferTimeout(parent.bufferTimeout); + parent.tailConfig.setChainedOutputs(Collections.singletonList(streamEdge)); + parent.tailConfig = new StreamConfig(new Configuration()); + parent.tailConfig.setStreamOperatorFactory(checkNotNull(operatorFactory)); + parent.tailConfig.setOperatorID(operatorID == null ? new OperatorID() : operatorID); + parent.tailConfig.setupNetworkInputs(inputSerializer); + parent.tailConfig.setTypeSerializerOut(outputSerializer); + if (createKeyedStateBackend) { + // used to test multiple stateful operators chained in a single task. + parent.tailConfig.setStateKeySerializer(inputSerializer); + parent.tailConfig.setStateBackendUsesManagedMemory(true); + parent.tailConfig.setManagedMemoryFractionOperatorOfUseCase( + ManagedMemoryUseCase.STATE_BACKEND, 1.0); + } + if (!nonChainedOutPuts.isEmpty()) { + List<NonChainedOutput> nonChainedOutputs = + createNonChainedOutputs(nonChainedOutPuts, streamEdge); + + parent.tailConfig.setVertexNonChainedOutputs(nonChainedOutputs); + parent.tailConfig.setOperatorNonChainedOutputs(nonChainedOutputs); + parent.chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs); + parent.tailConfig.setNumberOfOutputs(nonChainedOutputs.size()); + parent.outEdgesInOrder.add(nonChainedOutputs); + parent.setTailNonChainedOutputs = false; + } + parent.tailConfig.setChainIndex(parent.chainIndex); + parent.tailConfig.serializeAllConfigs(); + + parent.chainedConfigs.put(parent.chainIndex, parent.tailConfig); + return parent; + } + + private List<NonChainedOutput> createNonChainedOutputs( + Map<OutputTag<?>, Integer> nonChainedOutputsCount, StreamEdge streamEdge) { + List<NonChainedOutput> nonChainedOutputs = new ArrayList<>(); + nonChainedOutputsCount.forEach( + (outputTag, value) -> { + for (int i = 0; i < value; i++) { + nonChainedOutputs.add( + new NonChainedOutput( + true, + streamEdge.getTargetId(), + 1, + 1, + 100, + false, + new IntermediateDataSetID(), + placeHolderTag.equals(outputTag) ? null : outputTag, + new BroadcastPartitioner<>(), + ResultPartitionType.PIPELINED_BOUNDED)); + if (!placeHolderTag.equals(outputTag)) { + parent.tailConfig.setTypeSerializerSideOut( + outputTag, outputSerializer); + } + } + }); + return nonChainedOutputs; + } + } }