[hotfix] Cleanup routing of records in OperatorChain
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c6254e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c6254e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c6254e Branch: refs/heads/tableOnCalcite Commit: 28c6254ee385fe746e868a81b2207bf66b552174 Parents: e9c83ea Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 8 16:14:00 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 8 20:36:35 2016 +0100 ---------------------------------------------------------------------- .../BroadcastOutputSelectorWrapper.java | 45 ------- .../api/collector/selector/DirectedOutput.java | 130 +++++++++++++++++++ .../selector/DirectedOutputSelectorWrapper.java | 97 -------------- .../selector/OutputSelectorWrapper.java | 9 +- .../selector/OutputSelectorWrapperFactory.java | 33 ----- .../flink/streaming/api/graph/StreamConfig.java | 20 +-- .../flink/streaming/api/graph/StreamNode.java | 10 +- .../api/graph/StreamingJobGraphGenerator.java | 2 +- .../streaming/runtime/io/CollectorWrapper.java | 61 --------- .../streaming/runtime/tasks/OperatorChain.java | 84 ++++++++++-- .../flink/streaming/api/OutputSplitterTest.java | 2 +- .../runtime/tasks/StreamTaskTestHarness.java | 9 +- 12 files changed, 225 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java deleted file mode 100644 index 7034b11..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector.selector; - -import java.util.ArrayList; - -import org.apache.flink.streaming.api.graph.StreamEdge; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; - -public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> { - - private static final long serialVersionUID = 1L; - - private final ArrayList<Collector<StreamRecord<OUT>>> outputs; - - public BroadcastOutputSelectorWrapper() { - outputs = new ArrayList<Collector<StreamRecord<OUT>>>(); - } - - @Override - public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) { - outputs.add(output); - } - - @Override - public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) { - return outputs; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java new file mode 100644 index 0000000..52c50b3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + + +public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> { + + private final OutputSelector<OUT>[] outputSelectors; + + private final Output<StreamRecord<OUT>>[] selectAllOutputs; + + private final HashMap<String, Output<StreamRecord<OUT>>[]> outputMap; + + private final Output<StreamRecord<OUT>>[] allOutputs; + + + @SuppressWarnings({"unchecked", "rawtypes"}) + public DirectedOutput( + List<OutputSelector<OUT>> outputSelectors, + List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs) + { + this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]); + + this.allOutputs = new Output[outputs.size()]; + for (int i = 0; i < outputs.size(); i++) { + allOutputs[i] = outputs.get(i).f0; + } + + + HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new HashSet<Output<StreamRecord<OUT>>>(); + HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap = new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>(); + + for (Tuple2<Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs) { + final Output<StreamRecord<OUT>> output = outputPair.f0; + final StreamEdge edge = outputPair.f1; + + List<String> selectedNames = edge.getSelectedNames(); + + if (selectedNames.isEmpty()) { + selectAllOutputs.add(output); + } + else { + for (String selectedName : selectedNames) { + if (!outputMap.containsKey(selectedName)) { + outputMap.put(selectedName, new ArrayList<Output<StreamRecord<OUT>>>()); + outputMap.get(selectedName).add(output); + } + else { + if (!outputMap.get(selectedName).contains(output)) { + outputMap.get(selectedName).add(output); + } + } + } + } + } + + this.selectAllOutputs = selectAllOutputs.toArray(new Output[selectAllOutputs.size()]); + + this.outputMap = new HashMap<>(); + for (Map.Entry<String, ArrayList<Output<StreamRecord<OUT>>>> entry : outputMap.entrySet()) { + Output<StreamRecord<OUT>>[] arr = entry.getValue().toArray(new Output[entry.getValue().size()]); + this.outputMap.put(entry.getKey(), arr); + } + } + + + @Override + public void emitWatermark(Watermark mark) { + for (Output<StreamRecord<OUT>> out : allOutputs) { + out.emitWatermark(mark); + } + } + + @Override + public void collect(StreamRecord<OUT> record) { + Set<Output<StreamRecord<OUT>>> selectedOutputs = new HashSet<Output<StreamRecord<OUT>>>(selectAllOutputs.length); + Collections.addAll(selectedOutputs, selectAllOutputs); + + for (OutputSelector<OUT> outputSelector : outputSelectors) { + Iterable<String> outputNames = outputSelector.select(record.getValue()); + + for (String outputName : outputNames) { + Output<StreamRecord<OUT>>[] outputList = outputMap.get(outputName); + if (outputList != null) { + Collections.addAll(selectedOutputs, outputList); + } + } + } + + for (Output<StreamRecord<OUT>> out : selectedOutputs) { + out.collect(record); + } + } + + @Override + public void close() { + for (Output<StreamRecord<OUT>> out : allOutputs) { + out.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java deleted file mode 100644 index 84558fc..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector.selector; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.flink.streaming.api.graph.StreamEdge; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class); - - private List<OutputSelector<OUT>> outputSelectors; - - private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>> outputMap; - private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs; - - public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) { - this.outputSelectors = outputSelectors; - this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>(); - this.outputMap = new HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>(); - } - - @Override - public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) { - List<String> selectedNames = edge.getSelectedNames(); - - if (selectedNames.isEmpty()) { - selectAllOutputs.add(output); - } - else { - for (String selectedName : selectedNames) { - if (!outputMap.containsKey(selectedName)) { - outputMap.put(selectedName, new ArrayList<Collector<StreamRecord<OUT>>>()); - outputMap.get(selectedName).add(output); - } - else { - if (!outputMap.get(selectedName).contains(output)) { - outputMap.get(selectedName).add(output); - } - } - } - } - } - - @Override - public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) { - Set<Collector<StreamRecord<OUT>>> selectedOutputs = new HashSet<Collector<StreamRecord<OUT>>>(selectAllOutputs); - - for (OutputSelector<OUT> outputSelector : outputSelectors) { - Iterable<String> outputNames = outputSelector.select(record); - - for (String outputName : outputNames) { - List<Collector<StreamRecord<OUT>>> outputList = outputMap.get(outputName); - - try { - selectedOutputs.addAll(outputList); - } catch (NullPointerException e) { - if (LOG.isErrorEnabled()) { - String format = String.format( - "Cannot emit because no output is selected with the name: %s", - outputName); - LOG.error(format); - } - } - } - } - - return selectedOutputs; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java index f25c995..971e42b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java @@ -19,14 +19,7 @@ package org.apache.flink.streaming.api.collector.selector; import java.io.Serializable; -import org.apache.flink.streaming.api.graph.StreamEdge; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; - public interface OutputSelectorWrapper<OUT> extends Serializable { - public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge); - - public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record); - + void sendOutputs(OUT record); } http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java deleted file mode 100644 index dca2ede..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector.selector; - -import java.util.List; - -public class OutputSelectorWrapperFactory { - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> outputSelectors) { - if (outputSelectors.size() == 0) { - return new BroadcastOutputSelectorWrapper(); - } else { - return new DirectedOutputSelectorWrapper(outputSelectors); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 7a07c79..311b7fb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,7 +30,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.ClassLoaderUtil; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.streaming.runtime.tasks.StreamTaskException; @@ -38,7 +39,7 @@ import org.apache.flink.util.InstantiationUtil; public class StreamConfig implements Serializable { private static final long serialVersionUID = 1L; - + // ------------------------------------------------------------------------ // Config Keys // ------------------------------------------------------------------------ @@ -191,19 +192,22 @@ public class StreamConfig implements Serializable { } } - public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) { + public void setOutputSelectors(List<OutputSelector<?>> outputSelectors) { try { - InstantiationUtil.writeObjectToConfig(outputSelectorWrapper, this.config, OUTPUT_SELECTOR_WRAPPER); + InstantiationUtil.writeObjectToConfig(outputSelectors, this.config, OUTPUT_SELECTOR_WRAPPER); } catch (IOException e) { - throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e); + throw new StreamTaskException("Could not serialize output selectors", e); } } - public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) { + public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader userCodeClassloader) { try { - return InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, cl); + List<OutputSelector<T>> selectors = + InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, userCodeClassloader); + return selectors == null ? Collections.<OutputSelector<T>>emptyList() : selectors; + } catch (Exception e) { - throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.", e); + throw new StreamTaskException("Could not read output selectors", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 0a612f3..3e06037 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -26,15 +26,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.api.collector.selector.OutputSelector; -import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; -import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamOperator; /** - * Class representing the operators in the streaming programs, with all their - * properties. - * + * Class representing the operators in the streaming programs, with all their properties. */ public class StreamNode implements Serializable { @@ -168,10 +164,6 @@ public class StreamNode implements Serializable { return outputSelectors; } - public OutputSelectorWrapper<?> getOutputSelectorWrapper() { - return OutputSelectorWrapperFactory.create(getOutputSelectors()); - } - public void addOutputSelector(OutputSelector<?> outputSelector) { this.outputSelectors.add(outputSelector); } http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index c0d2856..c810e47 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -310,7 +310,7 @@ public class StreamingJobGraphGenerator { config.setTypeSerializerOut(vertex.getTypeSerializerOut()); config.setStreamOperator(vertex.getOperator()); - config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper()); + config.setOutputSelectors(vertex.getOutputSelectors()); config.setNumberOfOutputs(nonChainableOutputs.size()); config.setNonChainedOutputs(nonChainableOutputs); http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java deleted file mode 100644 index 01e997d..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; -import org.apache.flink.streaming.api.graph.StreamEdge; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; - -public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> { - - private OutputSelectorWrapper<OUT> outputSelectorWrapper; - - private ArrayList<Output<StreamRecord<OUT>>> allOutputs; - - public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) { - this.outputSelectorWrapper = outputSelectorWrapper; - allOutputs = new ArrayList<Output<StreamRecord<OUT>>>(); - } - - public void addCollector(Output<StreamRecord<OUT>> output, StreamEdge edge) { - outputSelectorWrapper.addCollector(output, edge); - allOutputs.add(output); - } - - @Override - public void collect(StreamRecord<OUT> record) { - for (Collector<StreamRecord<OUT>> output : outputSelectorWrapper.getSelectedOutputs(record.getValue())) { - output.collect(record); - } - } - - @Override - public void emitWatermark(Watermark mark) { - for (Output<?> output : allOutputs) { - output.emitWatermark(mark); - } - } - - @Override - public void close() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 125279c..5313bc9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -24,15 +24,16 @@ import java.util.List; import java.util.Map; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.collector.selector.DirectedOutput; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.CollectorWrapper; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; -import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -45,6 +46,13 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * The {@code OperatorChain} contains all operators that are executed as one chain within a single + * {@link StreamTask}. + * + * @param <OUT> The type of elements accepted by the chain, i.e., the input type of the chain's + * head operator. + */ public class OperatorChain<OUT> { private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class); @@ -182,15 +190,14 @@ public class OperatorChain<OUT> { Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, List<StreamOperator<?>> allOperators) { - // We create a wrapper that will encapsulate the chained operators and network outputs - OutputSelectorWrapper<T> outputSelectorWrapper = operatorConfig.getOutputSelectorWrapper(userCodeClassloader); - CollectorWrapper<T> wrapper = new CollectorWrapper<T>(outputSelectorWrapper); - + List<Tuple2<Output<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4); + // create collectors for the network outputs for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) { @SuppressWarnings("unchecked") RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge); - wrapper.addCollector(output, outputEdge); + + allOutputs.add(new Tuple2<Output<StreamRecord<T>>, StreamEdge>(output, outputEdge)); } // Create collectors for the chained outputs @@ -200,9 +207,37 @@ public class OperatorChain<OUT> { Output<StreamRecord<T>> output = createChainedOperator( containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); - wrapper.addCollector(output, outputEdge); + + allOutputs.add(new Tuple2<>(output, outputEdge)); + } + + // if there are multiple outputs, or the outputs are directed, we need to + // wrap them as one output + + List<OutputSelector<T>> selectors = operatorConfig.getOutputSelectors(userCodeClassloader); + + if (selectors == null || selectors.isEmpty()) { + // simple path, no selector necessary + if (allOutputs.size() == 1) { + return allOutputs.get(0).f0; + } + else { + // send to N outputs. Note that this includes teh special case + // of sending to zero outputs + @SuppressWarnings({"unchecked", "rawtypes"}) + Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()]; + for (int i = 0; i < allOutputs.size(); i++) { + asArray[i] = allOutputs.get(i).f0; + } + + return new BroadcastingOutputCollector<T>(asArray); + } + } + else { + // selector present, more complex routing necessary + return new DirectedOutput<T>(selectors, allOutputs); + } - return wrapper; } private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator( @@ -309,7 +344,6 @@ public class OperatorChain<OUT> { @Override public void collect(StreamRecord<T> record) { try { - StreamRecord<T> copy = new StreamRecord<>(serializer.copy(record.getValue()), record.getTimestamp()); operator.setKeyContextElement1(copy); @@ -320,4 +354,34 @@ public class OperatorChain<OUT> { } } } + + private static final class BroadcastingOutputCollector<T> implements Output<StreamRecord<T>> { + + private final Output<StreamRecord<T>>[] outputs; + + public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) { + this.outputs = outputs; + } + + @Override + public void emitWatermark(Watermark mark) { + for (Output<StreamRecord<T>> output : outputs) { + output.emitWatermark(mark); + } + } + + @Override + public void collect(StreamRecord<T> record) { + for (Output<StreamRecord<T>> output : outputs) { + output.collect(record); + } + } + + @Override + public void close() { + for (Output<StreamRecord<T>> output : outputs) { + output.close(); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java index 8525d37..5126d11 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java @@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.streaming.util.TestListResultSink; -import org.apache.flink.streaming.util.TestStreamEnvironment; + import org.junit.Test; public class OutputSplitterTest extends StreamingMultipleProgramsTestBase { http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 2cca3ff..e32b304 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.ExecutionConfig; @@ -26,7 +27,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleIn import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; @@ -40,11 +40,11 @@ import org.apache.flink.util.InstantiationUtil; import org.junit.Assert; import java.io.IOException; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; - /** * Test harness for testing a {@link StreamTask}. * @@ -91,6 +91,7 @@ public class StreamTaskTestHarness<OUT> { // input related methods only need to be implemented once, in generic form protected int numInputGates; protected int numInputChannelsPerGate; + @SuppressWarnings("rawtypes") protected StreamTestSingleInputGate[] inputGates; @@ -128,7 +129,7 @@ public class StreamTaskTestHarness<OUT> { mockEnv.addOutput(outputList, outputStreamRecordSerializer); - streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>()); + streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList()); streamConfig.setNumberOfOutputs(1); StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {