[STORM-1961] Fields grouping for state query and refactored StreamBuilder

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc19597c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc19597c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc19597c

Branch: refs/heads/master
Commit: dc19597c290ca0177cf9a06c81f13a60a3afedd2
Parents: 3a10865
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Dec 29 15:16:34 2016 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Fri Jan 13 01:20:44 2017 +0530

----------------------------------------------------------------------
 .../starter/streams/StateQueryExample.java      |   2 +-
 .../src/jvm/org/apache/storm/streams/Node.java  |   2 +-
 .../org/apache/storm/streams/PairStream.java    |   8 +-
 .../jvm/org/apache/storm/streams/Stream.java    |  16 ++-
 .../org/apache/storm/streams/StreamBuilder.java | 113 +++++++++----------
 5 files changed, 63 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
index 2f0a4a3..6d6a4b3 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
@@ -53,7 +53,7 @@ import java.util.Map;
 public class StateQueryExample {
     public static void main(String[] args) throws Exception {
         StreamBuilder builder = new StreamBuilder();
-        StreamState<String, Long> ss = builder.newStream(new TestWordSpout(), 
new ValueMapper<String>(0))
+        StreamState<String, Long> ss = builder.newStream(new TestWordSpout(), 
new ValueMapper<String>(0), 2)
                 .mapToPair(w -> Pair.of(w, 1))
                 .updateStateByKey(0L, (count, val) -> count + 1);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/storm-core/src/jvm/org/apache/storm/streams/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Node.java 
b/storm-core/src/jvm/org/apache/storm/streams/Node.java
index 3507f50..d21dee9 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/Node.java
@@ -91,7 +91,7 @@ abstract class Node implements Serializable {
         this.componentId = componentId;
     }
 
-    Integer getParallelism() {
+    int getParallelism() {
         return parallelism;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java 
b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
index 964cdba..69e6c37 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
@@ -420,13 +420,7 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
     }
 
     private PairStream<K, V> partitionBy(Fields fields) {
-        return partitionBy(fields, node.parallelism);
-    }
-
-    private PairStream<K, V> partitionBy(Fields fields, int parallelism) {
-        return new PairStream<>(
-                streamBuilder,
-                addNode(node, new PartitionNode(stream, 
node.getOutputFields(), GroupingInfo.fields(fields)), parallelism));
+        return toPairStream(partitionBy(fields, node.parallelism));
     }
 
     private PairStream<K, V> toPairStream(Stream<Pair<K, V>> stream) {

http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/storm-core/src/jvm/org/apache/storm/streams/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Stream.java 
b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
index d553390..ef03ae3 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
@@ -370,9 +370,9 @@ public class Stream<T> {
      * @return the result stream
      */
     public <V> PairStream<T, V> stateQuery(StreamState<T, V> streamState) {
-        // need all grouping for state query since the state is per-task
-        Node node = all().addProcessorNode(new 
StateQueryProcessor<>(streamState), KEY_VALUE);
-        return new PairStream<>(streamBuilder, node);
+        // need field grouping for state query so that the query is routed to 
the correct task
+        Node newNode = partitionBy(VALUE, 
node.getParallelism()).addProcessorNode(new StateQueryProcessor<>(streamState), 
KEY_VALUE);
+        return new PairStream<>(streamBuilder, newNode);
     }
 
     Node getNode() {
@@ -435,12 +435,10 @@ public class Stream<T> {
         return new Stream<>(streamBuilder, partitionNode);
     }
 
-    private Stream<T> all() {
-        if (node.getParallelism() == 1) {
-            return this;
-        }
-        Node partitionNode = addNode(new PartitionNode(stream, 
node.getOutputFields(), GroupingInfo.all()));
-        return new Stream<>(streamBuilder, partitionNode);
+    protected Stream<T> partitionBy(Fields fields, int parallelism) {
+        return new Stream<>(
+                streamBuilder,
+                addNode(node, new PartitionNode(stream, 
node.getOutputFields(), GroupingInfo.fields(fields)), parallelism));
     }
 
     private boolean shouldPartition() {

http://git-wip-us.apache.org/repos/asf/storm/blob/dc19597c/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java 
b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
index 7dff25d..0bf02be 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
@@ -17,10 +17,7 @@
  */
 package org.apache.storm.streams;
 
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Table;
@@ -28,7 +25,6 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.streams.operations.IdentityFunction;
 import org.apache.storm.streams.operations.mappers.PairValueMapper;
 import org.apache.storm.streams.operations.mappers.TupleValueMapper;
-import org.apache.storm.streams.processors.JoinProcessor;
 import org.apache.storm.streams.processors.MapProcessor;
 import org.apache.storm.streams.processors.Processor;
 import org.apache.storm.streams.processors.StateQueryProcessor;
@@ -48,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -56,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * A builder for constructing a {@link StormTopology} via storm streams api 
(DSL)
@@ -261,9 +259,9 @@ public class StreamBuilder {
                 p.put(SpoutNode.class, 0);
                 p.put(UpdateStateByKeyProcessor.class, 1);
                 p.put(ProcessorNode.class, 2);
-                p.put(StateQueryProcessor.class, 3);
-                p.put(PartitionNode.class, 4);
-                p.put(WindowNode.class, 5);
+                p.put(PartitionNode.class, 3);
+                p.put(WindowNode.class, 4);
+                p.put(StateQueryProcessor.class, 5);
                 p.put(SinkNode.class, 6);
             }
             @Override
@@ -384,30 +382,36 @@ public class StreamBuilder {
         return nodes;
     }
 
+    private Collection<List<ProcessorNode>> 
parallelismGroups(List<ProcessorNode> processorNodes) {
+        return 
processorNodes.stream().collect(Collectors.groupingBy(Node::getParallelism)).values();
+    }
+
     private void processCurGroup(TopologyBuilder topologyBuilder) {
-        if (curGroup.isEmpty()) {
-            return;
+        if (!curGroup.isEmpty()) {
+            parallelismGroups(curGroup).forEach(g -> 
doProcessCurGroup(topologyBuilder, g));
+            curGroup.clear();
         }
+    }
 
+    private void doProcessCurGroup(TopologyBuilder topologyBuilder, 
List<ProcessorNode> group) {
         String boltId = UniqueIdGen.getInstance().getUniqueBoltId();
-        for (ProcessorNode processorNode : curGroup) {
+        for (ProcessorNode processorNode : group) {
             processorNode.setComponentId(boltId);
             
processorNode.setWindowedParentStreams(getWindowedParentStreams(processorNode));
         }
-        final Set<ProcessorNode> initialProcessors = 
initialProcessors(curGroup);
+        final Set<ProcessorNode> initialProcessors = initialProcessors(group);
         Set<Window<?, ?>> windowParams = getWindowParams(initialProcessors);
         if (windowParams.isEmpty()) {
-            if (hasStatefulProcessor(curGroup)) {
-                addStatefulBolt(topologyBuilder, boltId, initialProcessors);
+            if (hasStatefulProcessor(group)) {
+                addStatefulBolt(topologyBuilder, boltId, initialProcessors, 
group);
             } else {
-                addBolt(topologyBuilder, boltId, initialProcessors);
+                addBolt(topologyBuilder, boltId, initialProcessors, group);
             }
         } else if (windowParams.size() == 1) {
-            addWindowedBolt(topologyBuilder, boltId, initialProcessors, 
windowParams.iterator().next());
+            addWindowedBolt(topologyBuilder, boltId, initialProcessors, 
windowParams.iterator().next(), group);
         } else {
-            throw new IllegalStateException("More than one window config for 
current group " + curGroup);
+            throw new IllegalStateException("More than one window config for 
current group " + group);
         }
-        curGroup.clear();
     }
 
     private boolean hasStatefulProcessor(List<ProcessorNode> processorNodes) {
@@ -419,16 +423,11 @@ public class StreamBuilder {
         return false;
     }
 
-    private int getParallelism() {
-        Set<Integer> parallelisms = new 
HashSet<>(Collections2.transform(curGroup, new Function<ProcessorNode, 
Integer>() {
-            @Override
-            public Integer apply(ProcessorNode input) {
-                return input.getParallelism();
-            }
-        }));
+    private int getParallelism(List<ProcessorNode> group) {
+        Set<Integer> parallelisms = 
group.stream().map(Node::getParallelism).collect(Collectors.toSet());
 
         if (parallelisms.size() > 1) {
-            throw new IllegalStateException("Current group does not have same 
parallelism " + curGroup);
+            throw new IllegalStateException("Current group does not have same 
parallelism " + group);
         }
 
         return parallelisms.isEmpty() ? 1 : parallelisms.iterator().next();
@@ -446,16 +445,7 @@ public class StreamBuilder {
             }
         }
 
-        Set<Window<?, ?>> windowParams = new HashSet<>();
-        if (!windowNodes.isEmpty()) {
-            windowParams.addAll(new 
HashSet<>(Collections2.transform(windowNodes, new Function<WindowNode, 
Window<?, ?>>() {
-                @Override
-                public Window<?, ?> apply(WindowNode input) {
-                    return input.getWindowParams();
-                }
-            })));
-        }
-        return windowParams;
+        return 
windowNodes.stream().map(WindowNode::getWindowParams).collect(Collectors.toSet());
     }
 
     private void addSpout(TopologyBuilder topologyBuilder, SpoutNode spout) {
@@ -481,39 +471,41 @@ public class StreamBuilder {
 
     private StreamBolt addBolt(TopologyBuilder topologyBuilder,
                                String boltId,
-                               Set<ProcessorNode> initialProcessors) {
-        ProcessorBolt bolt = new ProcessorBolt(boltId, graph, curGroup);
-        BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, 
getParallelism());
-        bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, 
initialProcessors));
+                               Set<ProcessorNode> initialProcessors,
+                               List<ProcessorNode> group) {
+        ProcessorBolt bolt = new ProcessorBolt(boltId, graph, group);
+        BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, 
getParallelism(group));
+        bolt.setStreamToInitialProcessors(wireBolt(group, boltDeclarer, 
initialProcessors));
         streamBolts.put(bolt, boltDeclarer);
         return bolt;
     }
 
     private StreamBolt addStatefulBolt(TopologyBuilder topologyBuilder,
                                        String boltId,
-                                       Set<ProcessorNode> initialProcessors) {
-        StateQueryProcessor<?, ?> stateQueryProcessor = 
getStateQueryProcessor();
+                                       Set<ProcessorNode> initialProcessors,
+                                       List<ProcessorNode> group) {
+        StateQueryProcessor<?, ?> stateQueryProcessor = 
getStateQueryProcessor(group);
         StatefulProcessorBolt<?, ?> bolt;
         if (stateQueryProcessor == null) {
-            bolt = new StatefulProcessorBolt<>(boltId, graph, curGroup);
-            BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, 
getParallelism());
-            bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, 
initialProcessors));
+            bolt = new StatefulProcessorBolt<>(boltId, graph, group);
+            BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, 
getParallelism(group));
+            bolt.setStreamToInitialProcessors(wireBolt(group, boltDeclarer, 
initialProcessors));
             streamBolts.put(bolt, boltDeclarer);
         } else {
             // state query is added to the existing stateful bolt
             ProcessorNode updateStateNode = 
stateQueryProcessor.getStreamState().getUpdateStateNode();
             bolt = findStatefulProcessorBolt(updateStateNode);
-            for (ProcessorNode node : curGroup) {
+            for (ProcessorNode node : group) {
                 node.setComponentId(bolt.getId());
             }
-            bolt.addNodes(curGroup);
+            bolt.addNodes(group);
             bolt.addStreamToInitialProcessors(wireBolt(bolt.getNodes(), 
streamBolts.get(bolt), initialProcessors));
         }
         return bolt;
     }
 
-    private StateQueryProcessor<?, ?> getStateQueryProcessor() {
-        for (ProcessorNode node : curGroup) {
+    private StateQueryProcessor<?, ?> 
getStateQueryProcessor(List<ProcessorNode> group) {
+        for (ProcessorNode node : group) {
             if (node.getProcessor() instanceof StateQueryProcessor) {
                 return (StateQueryProcessor<?, ?>) node.getProcessor();
             }
@@ -524,10 +516,11 @@ public class StreamBuilder {
     private StreamBolt addWindowedBolt(TopologyBuilder topologyBuilder,
                                        String boltId,
                                        Set<ProcessorNode> initialProcessors,
-                                       Window<?, ?> windowParam) {
-        WindowedProcessorBolt bolt = new WindowedProcessorBolt(boltId, graph, 
curGroup, windowParam);
-        BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, 
getParallelism());
-        bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, 
initialProcessors));
+                                       Window<?, ?> windowParam,
+                                       List<ProcessorNode> group) {
+        WindowedProcessorBolt bolt = new WindowedProcessorBolt(boltId, graph, 
group, windowParam);
+        BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, 
getParallelism(group));
+        bolt.setStreamToInitialProcessors(wireBolt(group, boltDeclarer, 
initialProcessors));
         streamBolts.put(bolt, boltDeclarer);
         return bolt;
     }
@@ -554,17 +547,17 @@ public class StreamBuilder {
         return res;
     }
 
-    private Multimap<String, ProcessorNode> wireBolt(List<ProcessorNode> 
curGroup,
+    private Multimap<String, ProcessorNode> wireBolt(List<ProcessorNode> group,
                                                      BoltDeclarer boltDeclarer,
                                                      Set<ProcessorNode> 
initialProcessors) {
-        LOG.debug("Wiring bolt with boltDeclarer {}, curGroup {}, 
initialProcessors {}, nodeGroupingInfo {}",
-                boltDeclarer, curGroup, initialProcessors, nodeGroupingInfo);
+        LOG.debug("Wiring bolt with boltDeclarer {}, group {}, 
initialProcessors {}, nodeGroupingInfo {}",
+                boltDeclarer, group, initialProcessors, nodeGroupingInfo);
         Multimap<String, ProcessorNode> streamToInitialProcessor = 
ArrayListMultimap.create();
-        Set<ProcessorNode> curSet = new HashSet<>(curGroup);
+        Set<ProcessorNode> curSet = new HashSet<>(group);
         for (ProcessorNode curNode : initialProcessors) {
             for (Node parent : parentNodes(curNode)) {
                 if (curSet.contains(parent)) {
-                    LOG.debug("Parent {} of curNode {} is in curGroup {}", 
parent, curNode, curGroup);
+                    LOG.debug("Parent {} of curNode {} is in group {}", 
parent, curNode, group);
                 } else {
                     for (String stream : curNode.getParentStreams(parent)) {
                         declareGrouping(boltDeclarer, parent, stream, 
nodeGroupingInfo.get(parent, stream));
@@ -592,10 +585,10 @@ public class StreamBuilder {
         }
     }
 
-    private Set<ProcessorNode> initialProcessors(List<ProcessorNode> curGroup) 
{
+    private Set<ProcessorNode> initialProcessors(List<ProcessorNode> group) {
         Set<ProcessorNode> nodes = new HashSet<>();
-        Set<ProcessorNode> curSet = new HashSet<>(curGroup);
-        for (ProcessorNode node : curGroup) {
+        Set<ProcessorNode> curSet = new HashSet<>(group);
+        for (ProcessorNode node : group) {
             for (Node parent : parentNodes(node)) {
                 if (!(parent instanceof ProcessorNode) || 
!curSet.contains(parent)) {
                     nodes.add(node);

Reply via email to