[STORM-1961] Fields grouping for state query and refactored StreamBuilder
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc19597c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc19597c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc19597c Branch: refs/heads/master Commit: dc19597c290ca0177cf9a06c81f13a60a3afedd2 Parents: 3a10865 Author: Arun Mahadevan <ar...@apache.org> Authored: Thu Dec 29 15:16:34 2016 +0530 Committer: Arun Mahadevan <ar...@apache.org> Committed: Fri Jan 13 01:20:44 2017 +0530 ---------------------------------------------------------------------- .../starter/streams/StateQueryExample.java | 2 +- .../src/jvm/org/apache/storm/streams/Node.java | 2 +- .../org/apache/storm/streams/PairStream.java | 8 +- .../jvm/org/apache/storm/streams/Stream.java | 16 ++- .../org/apache/storm/streams/StreamBuilder.java | 113 +++++++++---------- 5 files changed, 63 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java index 2f0a4a3..6d6a4b3 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java @@ -53,7 +53,7 @@ import java.util.Map; public class StateQueryExample { public static void main(String[] args) throws Exception { StreamBuilder builder = new StreamBuilder(); - StreamState<String, Long> ss = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0)) + StreamState<String, Long> ss = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0), 2) .mapToPair(w -> Pair.of(w, 1)) .updateStateByKey(0L, (count, val) -> count + 1); http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/storm-core/src/jvm/org/apache/storm/streams/Node.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/streams/Node.java b/storm-core/src/jvm/org/apache/storm/streams/Node.java index 3507f50..d21dee9 100644 --- a/storm-core/src/jvm/org/apache/storm/streams/Node.java +++ b/storm-core/src/jvm/org/apache/storm/streams/Node.java @@ -91,7 +91,7 @@ abstract class Node implements Serializable { this.componentId = componentId; } - Integer getParallelism() { + int getParallelism() { return parallelism; } http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/storm-core/src/jvm/org/apache/storm/streams/PairStream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java index 964cdba..69e6c37 100644 --- a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java +++ b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java @@ -420,13 +420,7 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> { } private PairStream<K, V> partitionBy(Fields fields) { - return partitionBy(fields, node.parallelism); - } - - private PairStream<K, V> partitionBy(Fields fields, int parallelism) { - return new PairStream<>( - streamBuilder, - addNode(node, new PartitionNode(stream, node.getOutputFields(), GroupingInfo.fields(fields)), parallelism)); + return toPairStream(partitionBy(fields, node.parallelism)); } private PairStream<K, V> toPairStream(Stream<Pair<K, V>> stream) { http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/storm-core/src/jvm/org/apache/storm/streams/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/streams/Stream.java b/storm-core/src/jvm/org/apache/storm/streams/Stream.java index d553390..ef03ae3 100644 --- a/storm-core/src/jvm/org/apache/storm/streams/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/streams/Stream.java @@ -370,9 +370,9 @@ public class Stream<T> { * @return the result stream */ public <V> PairStream<T, V> stateQuery(StreamState<T, V> streamState) { - // need all grouping for state query since the state is per-task - Node node = all().addProcessorNode(new StateQueryProcessor<>(streamState), KEY_VALUE); - return new PairStream<>(streamBuilder, node); + // need field grouping for state query so that the query is routed to the correct task + Node newNode = partitionBy(VALUE, node.getParallelism()).addProcessorNode(new StateQueryProcessor<>(streamState), KEY_VALUE); + return new PairStream<>(streamBuilder, newNode); } Node getNode() { @@ -435,12 +435,10 @@ public class Stream<T> { return new Stream<>(streamBuilder, partitionNode); } - private Stream<T> all() { - if (node.getParallelism() == 1) { - return this; - } - Node partitionNode = addNode(new PartitionNode(stream, node.getOutputFields(), GroupingInfo.all())); - return new Stream<>(streamBuilder, partitionNode); + protected Stream<T> partitionBy(Fields fields, int parallelism) { + return new Stream<>( + streamBuilder, + addNode(node, new PartitionNode(stream, node.getOutputFields(), GroupingInfo.fields(fields)), parallelism)); } private boolean shouldPartition() { http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java index 7dff25d..0bf02be 100644 --- a/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java +++ b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java @@ -17,10 +17,7 @@ */ package org.apache.storm.streams; -import com.google.common.base.Function; -import com.google.common.base.Predicate; import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Collections2; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Multimap; import com.google.common.collect.Table; @@ -28,7 +25,6 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.streams.operations.IdentityFunction; import org.apache.storm.streams.operations.mappers.PairValueMapper; import org.apache.storm.streams.operations.mappers.TupleValueMapper; -import org.apache.storm.streams.processors.JoinProcessor; import org.apache.storm.streams.processors.MapProcessor; import org.apache.storm.streams.processors.Processor; import org.apache.storm.streams.processors.StateQueryProcessor; @@ -48,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -56,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; +import java.util.stream.Collectors; /** * A builder for constructing a {@link StormTopology} via storm streams api (DSL) @@ -261,9 +259,9 @@ public class StreamBuilder { p.put(SpoutNode.class, 0); p.put(UpdateStateByKeyProcessor.class, 1); p.put(ProcessorNode.class, 2); - p.put(StateQueryProcessor.class, 3); - p.put(PartitionNode.class, 4); - p.put(WindowNode.class, 5); + p.put(PartitionNode.class, 3); + p.put(WindowNode.class, 4); + p.put(StateQueryProcessor.class, 5); p.put(SinkNode.class, 6); } @Override @@ -384,30 +382,36 @@ public class StreamBuilder { return nodes; } + private Collection<List<ProcessorNode>> parallelismGroups(List<ProcessorNode> processorNodes) { + return processorNodes.stream().collect(Collectors.groupingBy(Node::getParallelism)).values(); + } + private void processCurGroup(TopologyBuilder topologyBuilder) { - if (curGroup.isEmpty()) { - return; + if (!curGroup.isEmpty()) { + parallelismGroups(curGroup).forEach(g -> doProcessCurGroup(topologyBuilder, g)); + curGroup.clear(); } + } + private void doProcessCurGroup(TopologyBuilder topologyBuilder, List<ProcessorNode> group) { String boltId = UniqueIdGen.getInstance().getUniqueBoltId(); - for (ProcessorNode processorNode : curGroup) { + for (ProcessorNode processorNode : group) { processorNode.setComponentId(boltId); processorNode.setWindowedParentStreams(getWindowedParentStreams(processorNode)); } - final Set<ProcessorNode> initialProcessors = initialProcessors(curGroup); + final Set<ProcessorNode> initialProcessors = initialProcessors(group); Set<Window<?, ?>> windowParams = getWindowParams(initialProcessors); if (windowParams.isEmpty()) { - if (hasStatefulProcessor(curGroup)) { - addStatefulBolt(topologyBuilder, boltId, initialProcessors); + if (hasStatefulProcessor(group)) { + addStatefulBolt(topologyBuilder, boltId, initialProcessors, group); } else { - addBolt(topologyBuilder, boltId, initialProcessors); + addBolt(topologyBuilder, boltId, initialProcessors, group); } } else if (windowParams.size() == 1) { - addWindowedBolt(topologyBuilder, boltId, initialProcessors, windowParams.iterator().next()); + addWindowedBolt(topologyBuilder, boltId, initialProcessors, windowParams.iterator().next(), group); } else { - throw new IllegalStateException("More than one window config for current group " + curGroup); + throw new IllegalStateException("More than one window config for current group " + group); } - curGroup.clear(); } private boolean hasStatefulProcessor(List<ProcessorNode> processorNodes) { @@ -419,16 +423,11 @@ public class StreamBuilder { return false; } - private int getParallelism() { - Set<Integer> parallelisms = new HashSet<>(Collections2.transform(curGroup, new Function<ProcessorNode, Integer>() { - @Override - public Integer apply(ProcessorNode input) { - return input.getParallelism(); - } - })); + private int getParallelism(List<ProcessorNode> group) { + Set<Integer> parallelisms = group.stream().map(Node::getParallelism).collect(Collectors.toSet()); if (parallelisms.size() > 1) { - throw new IllegalStateException("Current group does not have same parallelism " + curGroup); + throw new IllegalStateException("Current group does not have same parallelism " + group); } return parallelisms.isEmpty() ? 1 : parallelisms.iterator().next(); @@ -446,16 +445,7 @@ public class StreamBuilder { } } - Set<Window<?, ?>> windowParams = new HashSet<>(); - if (!windowNodes.isEmpty()) { - windowParams.addAll(new HashSet<>(Collections2.transform(windowNodes, new Function<WindowNode, Window<?, ?>>() { - @Override - public Window<?, ?> apply(WindowNode input) { - return input.getWindowParams(); - } - }))); - } - return windowParams; + return windowNodes.stream().map(WindowNode::getWindowParams).collect(Collectors.toSet()); } private void addSpout(TopologyBuilder topologyBuilder, SpoutNode spout) { @@ -481,39 +471,41 @@ public class StreamBuilder { private StreamBolt addBolt(TopologyBuilder topologyBuilder, String boltId, - Set<ProcessorNode> initialProcessors) { - ProcessorBolt bolt = new ProcessorBolt(boltId, graph, curGroup); - BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism()); - bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors)); + Set<ProcessorNode> initialProcessors, + List<ProcessorNode> group) { + ProcessorBolt bolt = new ProcessorBolt(boltId, graph, group); + BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism(group)); + bolt.setStreamToInitialProcessors(wireBolt(group, boltDeclarer, initialProcessors)); streamBolts.put(bolt, boltDeclarer); return bolt; } private StreamBolt addStatefulBolt(TopologyBuilder topologyBuilder, String boltId, - Set<ProcessorNode> initialProcessors) { - StateQueryProcessor<?, ?> stateQueryProcessor = getStateQueryProcessor(); + Set<ProcessorNode> initialProcessors, + List<ProcessorNode> group) { + StateQueryProcessor<?, ?> stateQueryProcessor = getStateQueryProcessor(group); StatefulProcessorBolt<?, ?> bolt; if (stateQueryProcessor == null) { - bolt = new StatefulProcessorBolt<>(boltId, graph, curGroup); - BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism()); - bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors)); + bolt = new StatefulProcessorBolt<>(boltId, graph, group); + BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism(group)); + bolt.setStreamToInitialProcessors(wireBolt(group, boltDeclarer, initialProcessors)); streamBolts.put(bolt, boltDeclarer); } else { // state query is added to the existing stateful bolt ProcessorNode updateStateNode = stateQueryProcessor.getStreamState().getUpdateStateNode(); bolt = findStatefulProcessorBolt(updateStateNode); - for (ProcessorNode node : curGroup) { + for (ProcessorNode node : group) { node.setComponentId(bolt.getId()); } - bolt.addNodes(curGroup); + bolt.addNodes(group); bolt.addStreamToInitialProcessors(wireBolt(bolt.getNodes(), streamBolts.get(bolt), initialProcessors)); } return bolt; } - private StateQueryProcessor<?, ?> getStateQueryProcessor() { - for (ProcessorNode node : curGroup) { + private StateQueryProcessor<?, ?> getStateQueryProcessor(List<ProcessorNode> group) { + for (ProcessorNode node : group) { if (node.getProcessor() instanceof StateQueryProcessor) { return (StateQueryProcessor<?, ?>) node.getProcessor(); } @@ -524,10 +516,11 @@ public class StreamBuilder { private StreamBolt addWindowedBolt(TopologyBuilder topologyBuilder, String boltId, Set<ProcessorNode> initialProcessors, - Window<?, ?> windowParam) { - WindowedProcessorBolt bolt = new WindowedProcessorBolt(boltId, graph, curGroup, windowParam); - BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism()); - bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors)); + Window<?, ?> windowParam, + List<ProcessorNode> group) { + WindowedProcessorBolt bolt = new WindowedProcessorBolt(boltId, graph, group, windowParam); + BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism(group)); + bolt.setStreamToInitialProcessors(wireBolt(group, boltDeclarer, initialProcessors)); streamBolts.put(bolt, boltDeclarer); return bolt; } @@ -554,17 +547,17 @@ public class StreamBuilder { return res; } - private Multimap<String, ProcessorNode> wireBolt(List<ProcessorNode> curGroup, + private Multimap<String, ProcessorNode> wireBolt(List<ProcessorNode> group, BoltDeclarer boltDeclarer, Set<ProcessorNode> initialProcessors) { - LOG.debug("Wiring bolt with boltDeclarer {}, curGroup {}, initialProcessors {}, nodeGroupingInfo {}", - boltDeclarer, curGroup, initialProcessors, nodeGroupingInfo); + LOG.debug("Wiring bolt with boltDeclarer {}, group {}, initialProcessors {}, nodeGroupingInfo {}", + boltDeclarer, group, initialProcessors, nodeGroupingInfo); Multimap<String, ProcessorNode> streamToInitialProcessor = ArrayListMultimap.create(); - Set<ProcessorNode> curSet = new HashSet<>(curGroup); + Set<ProcessorNode> curSet = new HashSet<>(group); for (ProcessorNode curNode : initialProcessors) { for (Node parent : parentNodes(curNode)) { if (curSet.contains(parent)) { - LOG.debug("Parent {} of curNode {} is in curGroup {}", parent, curNode, curGroup); + LOG.debug("Parent {} of curNode {} is in group {}", parent, curNode, group); } else { for (String stream : curNode.getParentStreams(parent)) { declareGrouping(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream)); @@ -592,10 +585,10 @@ public class StreamBuilder { } } - private Set<ProcessorNode> initialProcessors(List<ProcessorNode> curGroup) { + private Set<ProcessorNode> initialProcessors(List<ProcessorNode> group) { Set<ProcessorNode> nodes = new HashSet<>(); - Set<ProcessorNode> curSet = new HashSet<>(curGroup); - for (ProcessorNode node : curGroup) { + Set<ProcessorNode> curSet = new HashSet<>(group); + for (ProcessorNode node : group) { for (Node parent : parentNodes(node)) { if (!(parent instanceof ProcessorNode) || !curSet.contains(parent)) { nodes.add(node);