Repository: samza
Updated Branches:
  refs/heads/master 003ad1068 -> e904e70cb


SAMZA-1860: Modularize Join input validation in ExecutionPlanner

This change breaks down the validation of partition counts of input and
intermediate streams participating in Join operations into 3 separate steps:
1. Grouping `InputOperatorSpec`s by the `JoinOperatorSpec`s of the Join 
operations they participate in
2. Replacing `InputOperatorSpec`s with their corresponding `StreamEdge`s
3. Verifying/Inferring partition counts of input/intermediate streams

This change covers stream-stream Joins only.

Author: Ahmed Abdul Hamid <ahabd...@ahabdulh-mn1.linkedin.biz>
Author: Ahmed Abdul Hamid <ahabdulha...@linkedin.com>

Reviewers: Jagadish<jagad...@apache.org>, Bharath 
<bkumarasubraman...@linkedin.com>

Closes #637 from ahmedahamid/dev/ahabdulh/modularize-exec-planner


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e904e70c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e904e70c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e904e70c

Branch: refs/heads/master
Commit: e904e70cb4ad649fe64d6572bb98805812eb1851
Parents: 003ad10
Author: Ahmed Abdul Hamid <ahabd...@ahabdulh-mn1.linkedin.biz>
Authored: Tue Sep 25 20:03:14 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Tue Sep 25 20:03:14 2018 -0700

----------------------------------------------------------------------
 .../documentation/versioned/jobs/logging.md     |   4 +-
 docs/startup/download/index.md                  |  20 +-
 .../samza/execution/ExecutionPlanner.java       | 304 ++++++++++++-------
 .../execution/OperatorSpecGraphAnalyzer.java    |  96 ++++++
 .../samza/execution/TestExecutionPlanner.java   |  11 +-
 5 files changed, 301 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md 
b/docs/learn/documentation/versioned/jobs/logging.md
index 8cae27b..ac2d664 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -27,7 +27,7 @@ The [hello-samza](/startup/hello-samza/{{site.version}}) 
project shows how to us
 
 {% highlight xml %}
 <dependency>
-  <groupId>org.slf4j</groupId>
+  <setId>org.slf4j</setId>
   <artifactId>slf4j-log4j12</artifactId>
   <scope>runtime</scope>
   <version>1.6.2</version>
@@ -101,7 +101,7 @@ Sometimes it's desirable to change the Log4J log level from 
`INFO` to `DEBUG` at
 
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-log4j</artifactId>
   <scope>runtime</scope>
   <version>${samza.version}</version>

http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/docs/startup/download/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/download/index.md b/docs/startup/download/index.md
index 4fd0898..1f1477d 100644
--- a/docs/startup/download/index.md
+++ b/docs/startup/download/index.md
@@ -59,18 +59,18 @@ A Maven-based Samza project can pull in all required 
dependencies Samza dependen
 
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-api</artifactId>
   <version>0.14.1</version>
 </dependency>
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-core_2.11</artifactId>
   <version>0.14.1</version>
   <scope>runtime</scope>
 </dependency>
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-shell</artifactId>
   <classifier>dist</classifier>
   <type>tgz</type>
@@ -78,31 +78,31 @@ A Maven-based Samza project can pull in all required 
dependencies Samza dependen
   <scope>runtime</scope>
 </dependency>
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-yarn_2.11</artifactId>
   <version>0.14.1</version>
   <scope>runtime</scope>
 </dependency>
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-kv_2.11</artifactId>
   <version>0.14.1</version>
   <scope>runtime</scope>
 </dependency>
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-kv-rocksdb_2.11</artifactId>
   <version>0.14.1</version>
   <scope>runtime</scope>
 </dependency>
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-kv-inmemory_2.11</artifactId>
   <version>0.14.1</version>
   <scope>runtime</scope>
 </dependency>
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-kafka_2.11</artifactId>
   <version>0.14.1</version>
   <scope>runtime</scope>
@@ -113,7 +113,7 @@ Samza versions less than 0.12 should use artifacts with 
scala version 2.10 as su
 
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-yarn_2.10</artifactId>
   <version>0.11.0</version>
 </dependency>
@@ -123,7 +123,7 @@ Samza versions less than 0.9 should include this additional 
dependency.
 
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.samza</groupId>
+  <setId>org.apache.samza</setId>
   <artifactId>samza-serializers_2.10</artifactId>
   <version>0.8.1</version>
 </dependency>

http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 810f424..46aef8d 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -22,12 +22,14 @@ package org.apache.samza.execution;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ApplicationConfig;
@@ -36,13 +38,14 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.samza.execution.ExecutionPlanner.StreamEdgeSet.StreamEdgeSetCategory;
 import static org.apache.samza.util.StreamUtil.*;
 
 
@@ -66,17 +69,25 @@ public class ExecutionPlanner {
     this.streamConfig = new StreamConfig(config);
   }
 
-  public ExecutionPlan plan(OperatorSpecGraph specGraph) {
+  public ExecutionPlan plan(OperatorSpecGraph opSpecGraph) {
     validateConfig();
 
-    // create physical job graph based on stream graph
-    JobGraph jobGraph = createJobGraph(specGraph);
+    // Create physical job graph based on stream graph
+    JobGraph jobGraph = createJobGraph(opSpecGraph);
 
-    // fetch the external streams partition info
-    fetchInputAndOutputStreamPartitions(jobGraph, streamManager);
+    // Fetch the external streams partition info
+    fetchInputAndOutputStreamPartitions(jobGraph);
 
-    // figure out the partitions for internal streams
-    calculatePartitions(jobGraph);
+    // Verify agreement in partition count between all joined 
input/intermediate streams
+    validateJoinInputStreamPartitions(jobGraph);
+
+    if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
+      // Set partition count of intermediate streams not participating in joins
+      setIntermediateStreamPartitions(jobGraph);
+
+      // Validate partition counts were assigned for all intermediate streams
+      validateIntermediateStreamPartitions(jobGraph);
+    }
 
     return jobGraph;
   }
@@ -92,21 +103,21 @@ public class ExecutionPlanner {
   }
 
   /**
-   * Create the physical graph from {@link OperatorSpecGraph}
+   * Creates the physical graph from {@link OperatorSpecGraph}
    */
-  /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
-    JobGraph jobGraph = new JobGraph(config, specGraph);
+  /* package private */ JobGraph createJobGraph(OperatorSpecGraph opSpecGraph) 
{
+    JobGraph jobGraph = new JobGraph(config, opSpecGraph);
 
     // Source streams contain both input and intermediate streams.
-    Set<StreamSpec> sourceStreams = 
getStreamSpecs(specGraph.getInputOperators().keySet(), streamConfig);
+    Set<StreamSpec> sourceStreams = 
getStreamSpecs(opSpecGraph.getInputOperators().keySet(), streamConfig);
     // Sink streams contain both output and intermediate streams.
-    Set<StreamSpec> sinkStreams = 
getStreamSpecs(specGraph.getOutputStreams().keySet(), streamConfig);
+    Set<StreamSpec> sinkStreams = 
getStreamSpecs(opSpecGraph.getOutputStreams().keySet(), streamConfig);
 
     Set<StreamSpec> intermediateStreams = Sets.intersection(sourceStreams, 
sinkStreams);
     Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, 
intermediateStreams);
     Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, 
intermediateStreams);
 
-    Set<TableSpec> tables = specGraph.getTables().keySet();
+    Set<TableSpec> tables = opSpecGraph.getTables().keySet();
 
     // For this phase, we have a single job node for the whole dag
     String jobName = config.get(JobConfig.JOB_NAME());
@@ -131,25 +142,9 @@ public class ExecutionPlanner {
   }
 
   /**
-   * Figure out the number of partitions of all streams
+   * Fetches the partitions of input/output streams and update the 
corresponding StreamEdges.
    */
-  /* package private */ void calculatePartitions(JobGraph jobGraph) {
-    // calculate the partitions for the input streams of join operators
-    calculateJoinInputPartitions(jobGraph, streamConfig);
-
-    // calculate the partitions for the rest of intermediate streams
-    calculateIntermediateStreamPartitions(jobGraph, config);
-
-    // validate all the partitions are assigned
-    validateIntermediateStreamPartitions(jobGraph);
-  }
-
-  /**
-   * Fetch the partitions of input/output streams and update the corresponding 
StreamEdges.
-   * @param jobGraph {@link JobGraph}
-   * @param streamManager the {@link StreamManager} to interface with the 
streams.
-   */
-  /* package private */ static void 
fetchInputAndOutputStreamPartitions(JobGraph jobGraph, StreamManager 
streamManager) {
+  /* package private */ void fetchInputAndOutputStreamPartitions(JobGraph 
jobGraph) {
     Set<StreamEdge> existingStreams = new HashSet<>();
     existingStreams.addAll(jobGraph.getInputStreams());
     existingStreams.addAll(jobGraph.getOutputStreams());
@@ -188,98 +183,81 @@ public class ExecutionPlanner {
   }
 
   /**
-   * Calculate the partitions for the input streams of join operators
+   * Validates agreement in partition count between input/intermediate streams 
participating in join operations.
    */
-  /* package private */ static void calculateJoinInputPartitions(JobGraph 
jobGraph, StreamConfig streamConfig) {
-    // mapping from a source stream to all join specs reachable from it
-    Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges = 
HashMultimap.create();
-    // reverse mapping of the above
-    Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs = 
HashMultimap.create();
-    // A queue of joins with known input partitions
-    Queue<JoinOperatorSpec> joinQ = new LinkedList<>();
-    // The visited set keeps track of the join specs that have been already 
inserted in the queue before
-    Set<JoinOperatorSpec> visited = new HashSet<>();
-
-    jobGraph.getSpecGraph().getInputOperators().forEach((streamId, 
inputOperatorSpec) -> {
-        StreamEdge streamEdge = 
jobGraph.getOrCreateStreamEdge(getStreamSpec(streamId, streamConfig));
-        // Traverses the StreamGraph to find and update mappings for all Joins 
reachable from this input StreamEdge
-        findReachableJoins(inputOperatorSpec, streamEdge, 
joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
-      });
-
-    // At this point, joinQ contains joinSpecs where at least one of the input 
stream edge partitions is known.
-    while (!joinQ.isEmpty()) {
-      JoinOperatorSpec join = joinQ.poll();
-      int partitions = StreamEdge.PARTITIONS_UNKNOWN;
-      // loop through the input streams to the join and find the partition 
count
-      for (StreamEdge edge : joinSpecToStreamEdges.get(join)) {
-        int edgePartitions = edge.getPartitionCount();
-        if (edgePartitions != StreamEdge.PARTITIONS_UNKNOWN) {
-          if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
-            //if the partition is not assigned
-            partitions = edgePartitions;
-            log.info("Inferred the partition count {} for the join operator {} 
from {}."
-                , new Object[] {partitions, join.getOpId(), edge.getName()});
-          } else if (partitions != edgePartitions) {
-            throw  new SamzaException(String.format(
-                "Unable to resolve input partitions of stream %s for the join 
%s. Expected: %d, Actual: %d",
-                edge.getName(), join.getOpId(), partitions, edgePartitions));
-          }
-        }
-      }
-
-      // assign the partition count for intermediate streams
-      for (StreamEdge edge : joinSpecToStreamEdges.get(join)) {
-        if (edge.getPartitionCount() <= 0) {
-          log.info("Set the partition count to {} for input stream {} to the 
join {}.",
-              new Object[] {partitions, edge.getName(), join.getOpId()});
-          edge.setPartitionCount(partitions);
-
-          // find other joins can be inferred by setting this edge
-          for (JoinOperatorSpec op : streamEdgeToJoinSpecs.get(edge)) {
-            if (!visited.contains(op)) {
-              joinQ.add(op);
-              visited.add(op);
-            }
-          }
-        }
-      }
+  private void validateJoinInputStreamPartitions(JobGraph jobGraph) {
+    // Group input operator specs (input/intermediate streams) by the joins 
they participate in.
+    Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs =
+        
OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(jobGraph.getSpecGraph());
+
+    // Convert every group of input operator specs into a group of 
corresponding stream edges.
+    List<StreamEdgeSet> streamEdgeSets = new ArrayList<>();
+    for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
+      Collection<InputOperatorSpec> joinedInputOpSpecs = 
joinOpSpecToInputOpSpecs.get(joinOpSpec);
+      StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), 
joinedInputOpSpecs, jobGraph);
+      streamEdgeSets.add(streamEdgeSet);
     }
+
+    /*
+     * Sort the stream edge groups by their category so they appear in this 
order:
+     *   1. groups composed exclusively of stream edges with set partition 
counts
+     *   2. groups composed of a mix of stream edges  with set/unset partition 
counts
+     *   3. groups composed exclusively of stream edges with unset partition 
counts
+     *
+     *   This guarantees that we process the most constrained stream edge 
groups first,
+     *   which is crucial for intermediate stream edges that are members of 
multiple
+     *   stream edge groups. For instance, if we have the following groups of 
stream
+     *   edges (partition counts in parentheses, question marks for 
intermediate streams):
+     *
+     *      a. e1 (16), e2 (16)
+     *      b. e2 (16), e3 (?)
+     *      c. e3 (?), e4 (?)
+     *
+     *   processing them in the above order (most constrained first) is 
guaranteed to
+     *   yield correct assignment of partition counts of e3 and e4 in a single 
scan.
+     */
+    Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> 
e.getCategory().getSortOrder()));
+
+    // Verify agreement between joined input/intermediate streams.
+    // This may involve setting partition counts of intermediate stream edges.
+    
streamEdgeSets.forEach(ExecutionPlanner::validateAndAssignStreamEdgeSetPartitions);
   }
 
   /**
-   * This function traverses the {@link OperatorSpec} graph to find and update 
mappings for all Joins reachable
-   * from this input {@link StreamEdge}.
-   * @param operatorSpec the {@link OperatorSpec} to traverse
-   * @param sourceStreamEdge source {@link StreamEdge}
-   * @param joinSpecToStreamEdges mapping from join spec to its source {@link 
StreamEdge}s
-   * @param streamEdgeToJoinSpecs mapping from source {@link StreamEdge} to 
the join specs that consumes it
-   * @param joinQ queue that contains joinSpecs where at least one of the 
input stream edge partitions is known.
+   * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link 
StreamEdge}s
+   * correspond to the provided {@code inputOpSpecs}.
    */
-  private static void findReachableJoins(OperatorSpec operatorSpec, StreamEdge 
sourceStreamEdge,
-      Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges,
-      Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs,
-      Queue<JoinOperatorSpec> joinQ, Set<JoinOperatorSpec> visited) {
-
-    if (operatorSpec instanceof JoinOperatorSpec) {
-      JoinOperatorSpec joinOperatorSpec = (JoinOperatorSpec) operatorSpec;
-      joinSpecToStreamEdges.put(joinOperatorSpec, sourceStreamEdge);
-      streamEdgeToJoinSpecs.put(sourceStreamEdge, joinOperatorSpec);
-
-      if (!visited.contains(joinOperatorSpec) && 
sourceStreamEdge.getPartitionCount() > 0) {
-        // put the joins with known input partitions into the queue and mark 
as visited
-        joinQ.add(joinOperatorSpec);
-        visited.add(joinOperatorSpec);
+  private StreamEdgeSet getStreamEdgeSet(String setId, 
Iterable<InputOperatorSpec> inputOpSpecs,
+      JobGraph jobGraph) {
+
+    int countStreamEdgeWithSetPartitions = 0;
+    Set<StreamEdge> streamEdges = new HashSet<>();
+
+    for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
+      StreamEdge streamEdge = 
jobGraph.getOrCreateStreamEdge(getStreamSpec(inputOpSpec.getStreamId(), 
streamConfig));
+      if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) {
+        ++countStreamEdgeWithSetPartitions;
       }
+      streamEdges.add(streamEdge);
     }
 
-    Collection<OperatorSpec> registeredOperatorSpecs = 
operatorSpec.getRegisteredOperatorSpecs();
-    for (OperatorSpec registeredOpSpec : registeredOperatorSpecs) {
-      findReachableJoins(registeredOpSpec, sourceStreamEdge, 
joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ,
-          visited);
+    // Determine category of stream group based on stream partition counts.
+    StreamEdgeSetCategory category;
+    if (countStreamEdgeWithSetPartitions == 0) {
+      category = StreamEdgeSetCategory.NO_PARTITION_COUNT_SET;
+    } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) {
+      category = StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET;
+    } else {
+      category = StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET;
     }
+
+    return new StreamEdgeSet(setId, streamEdges, category);
   }
 
-  private static void calculateIntermediateStreamPartitions(JobGraph jobGraph, 
Config config) {
+  /**
+   * Sets partition count of intermediate streams which have not been assigned 
partition counts.
+   */
+  private void setIntermediateStreamPartitions(JobGraph jobGraph) {
     final String defaultPartitionsConfigProperty = 
JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
     int partitions = config.getInt(defaultPartitionsConfigProperty, 
StreamEdge.PARTITIONS_UNKNOWN);
     if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
@@ -315,6 +293,9 @@ public class ExecutionPlanner {
     }
   }
 
+  /**
+   * Ensures all intermediate streams have been assigned partition counts.
+   */
   private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
@@ -323,7 +304,102 @@ public class ExecutionPlanner {
     }
   }
 
+  /**
+   * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in 
partition count.
+   * This may include setting partition counts of intermediate streams in this 
set that do not
+   * have their partition counts set.
+   */
+  private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet 
streamEdgeSet) {
+    Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges();
+    StreamEdge firstStreamEdgeWithSetPartitions =
+        streamEdges.stream()
+            .filter(streamEdge -> streamEdge.getPartitionCount() != 
StreamEdge.PARTITIONS_UNKNOWN)
+            .findFirst()
+            .orElse(null);
+
+    // This group consists exclusively of intermediate streams with unknown 
partition counts.
+    // We cannot do any validation/computation of partition counts of such 
streams right here,
+    // but they are tackled later in the ExecutionPlanner.
+    if (firstStreamEdgeWithSetPartitions == null) {
+      return;
+    }
+
+    // Make sure all other stream edges in this group have the same partition 
count.
+    int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount();
+    for (StreamEdge streamEdge : streamEdges) {
+      int streamPartitions = streamEdge.getPartitionCount();
+      if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) {
+        streamEdge.setPartitionCount(partitions);
+        log.info("Inferred the partition count {} for the join operator {} 
from {}."
+            , new Object[] {partitions, streamEdgeSet.getSetId(), 
firstStreamEdgeWithSetPartitions.getName()});
+      } else if (streamPartitions != partitions) {
+        throw  new SamzaException(String.format(
+            "Unable to resolve input partitions of stream %s for the join %s. 
Expected: %d, Actual: %d",
+            streamEdge.getName(), streamEdgeSet.getSetId(), partitions, 
streamPartitions));
+      }
+    }
+  }
+
   /* package private */ static int maxPartitions(Collection<StreamEdge> edges) 
{
     return 
edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
   }
+
+  /**
+   * Represents a set of {@link StreamEdge}s.
+   */
+  /* package private */ static class StreamEdgeSet {
+
+    /**
+     * Indicates whether all stream edges in this group have their partition 
counts assigned.
+     */
+    public enum StreamEdgeSetCategory {
+      /**
+       * All stream edges in this group have their partition counts assigned.
+       */
+      ALL_PARTITION_COUNT_SET(0),
+
+      /**
+       * Only some stream edges in this group have their partition counts 
assigned.
+       */
+      SOME_PARTITION_COUNT_SET(1),
+
+      /**
+       * No stream edge in this group is assigned a partition count.
+       */
+      NO_PARTITION_COUNT_SET(2);
+
+
+      private final int sortOrder;
+
+      StreamEdgeSetCategory(int sortOrder) {
+        this.sortOrder = sortOrder;
+      }
+
+      public int getSortOrder() {
+        return sortOrder;
+      }
+    }
+
+    private final String setId;
+    private final Set<StreamEdge> streamEdges;
+    private final StreamEdgeSetCategory category;
+
+    public StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, 
StreamEdgeSetCategory category) {
+      this.setId = setId;
+      this.streamEdges = streamEdges;
+      this.category = category;
+    }
+
+    public Set<StreamEdge> getStreamEdges() {
+      return streamEdges;
+    }
+
+    public String getSetId() {
+      return setId;
+    }
+
+    public StreamEdgeSetCategory getCategory() {
+      return category;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
 
b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
new file mode 100644
index 0000000..aa1dff9
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+
+
+/**
+ * A utility class that encapsulates the logic for traversing an {@link 
OperatorSpecGraph} and building
+ * associations between related {@link OperatorSpec}s.
+ */
+/* package private */ class OperatorSpecGraphAnalyzer {
+
+  /**
+   * Returns a grouping of {@link InputOperatorSpec}s by the joins, i.e. 
{@link JoinOperatorSpec}s, they participate in.
+   */
+  public static Multimap<JoinOperatorSpec, InputOperatorSpec> 
getJoinToInputOperatorSpecs(
+      OperatorSpecGraph operatorSpecGraph) {
+
+    Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs = 
HashMultimap.create();
+
+    // Traverse graph starting from every input operator spec, observing 
connectivity between input operator specs
+    // and Join operator specs.
+    Iterable<InputOperatorSpec> inputOpSpecs = 
operatorSpecGraph.getInputOperators().values();
+    for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
+      // Observe all join operator specs reachable from this input operator 
spec.
+      JoinOperatorSpecVisitor joinOperatorSpecVisitor = new 
JoinOperatorSpecVisitor();
+      traverse(inputOpSpec, joinOperatorSpecVisitor, opSpec -> 
opSpec.getRegisteredOperatorSpecs());
+
+      // Associate every encountered join operator spec with this input 
operator spec.
+      for (JoinOperatorSpec joinOpSpec : 
joinOperatorSpecVisitor.getJoinOperatorSpecs()) {
+        joinOpSpecToInputOpSpecs.put(joinOpSpec, inputOpSpec);
+      }
+    }
+
+    return joinOpSpecToInputOpSpecs;
+  }
+
+  /**
+   * Traverses {@link OperatorSpec}s starting from {@code startOpSpec}, 
invoking {@code visitor} with every encountered
+   * {@link OperatorSpec}, and using {@code getNextOpSpecs} to determine the 
set of {@link OperatorSpec}s to visit next.
+   */
+  private static void traverse(OperatorSpec startOpSpec, 
Consumer<OperatorSpec> visitor,
+      Function<OperatorSpec, Collection<OperatorSpec>> getNextOpSpecs) {
+    visitor.accept(startOpSpec);
+    for (OperatorSpec nextOpSpec : getNextOpSpecs.apply(startOpSpec)) {
+      traverse(nextOpSpec, visitor, getNextOpSpecs);
+    }
+  }
+
+  /**
+   * An {@link OperatorSpecGraph} visitor that records all {@link 
JoinOperatorSpec}s encountered in the graph.
+   */
+  private static class JoinOperatorSpecVisitor implements 
Consumer<OperatorSpec> {
+    private Set<JoinOperatorSpec> joinOpSpecs = new HashSet<>();
+
+    @Override
+    public void accept(OperatorSpec operatorSpec) {
+      if (operatorSpec instanceof JoinOperatorSpec) {
+        joinOpSpecs.add((JoinOperatorSpec) operatorSpec);
+      }
+    }
+
+    public Set<JoinOperatorSpec> getJoinOperatorSpecs() {
+      return Collections.unmodifiableSet(joinOpSpecs);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index ad6b386..779d299 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -33,7 +33,6 @@ import 
org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -281,7 +280,7 @@ public class TestExecutionPlanner {
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
     JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
-    ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, 
streamManager);
+    planner.fetchInputAndOutputStreamPartitions(jobGraph);
     assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() 
== 64);
     assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() 
== 16);
     assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() 
== 32);
@@ -297,10 +296,7 @@ public class TestExecutionPlanner {
   public void testCalculateJoinInputPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
-    JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
-
-    ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, 
streamManager);
-    ExecutionPlanner.calculateJoinInputPartitions(jobGraph, new 
StreamConfig(config));
+    JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -324,8 +320,7 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
-    JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
-    planner.calculatePartitions(jobGraph);
+    JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {

Reply via email to