Repository: samza
Updated Branches:
  refs/heads/master 479103b25 -> 02192052f


SAMZA-1838: Make some minor improvements to ExecutionPlanner

prateekm vjagadish1989 whenever you get a chance. This is an initial set of 
mostly minor changes.

Author: Ahmed Abdul Hamid <ahabd...@ahabdulh-mn1.linkedin.biz>

Reviewers: Jagadish<jagad...@apache.org>

Closes #623 from ahmedahamid/dev/ahabdulh/execution-planner-stylistic-improv


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/02192052
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/02192052
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/02192052

Branch: refs/heads/master
Commit: 02192052f44a27bd3cdbb3dee1eff94f416cf666
Parents: 479103b
Author: Ahmed Abdul Hamid <ahabd...@ahabdulh-mn1.linkedin.biz>
Authored: Fri Sep 14 10:51:55 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Sep 14 10:51:55 2018 -0700

----------------------------------------------------------------------
 .../samza/execution/ExecutionPlanner.java       | 178 +++++++++++--------
 .../org/apache/samza/execution/JobGraph.java    |  82 ++++-----
 .../samza/execution/JobGraphJsonGenerator.java  |   4 +-
 .../samza/execution/TestExecutionPlanner.java   | 100 ++++++-----
 .../apache/samza/execution/TestJobGraph.java    |  38 ++--
 5 files changed, 219 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/02192052/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index ea892fe..810f424 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -21,6 +21,7 @@ package org.apache.samza.execution;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,7 +39,6 @@ import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,14 +54,16 @@ import static org.apache.samza.util.StreamUtil.*;
 public class ExecutionPlanner {
   private static final Logger log = 
LoggerFactory.getLogger(ExecutionPlanner.class);
 
-  static final int MAX_INFERRED_PARTITIONS = 256;
+  /* package private */ static final int MAX_INFERRED_PARTITIONS = 256;
 
   private final Config config;
+  private final StreamConfig streamConfig;
   private final StreamManager streamManager;
 
   public ExecutionPlanner(Config config, StreamManager streamManager) {
     this.config = config;
     this.streamManager = streamManager;
+    this.streamConfig = new StreamConfig(config);
   }
 
   public ExecutionPlan plan(OperatorSpecGraph specGraph) {
@@ -71,12 +73,10 @@ public class ExecutionPlanner {
     JobGraph jobGraph = createJobGraph(specGraph);
 
     // fetch the external streams partition info
-    updateExistingPartitions(jobGraph, streamManager);
+    fetchInputAndOutputStreamPartitions(jobGraph, streamManager);
 
-    if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
-      // figure out the partitions for internal streams
-      calculatePartitions(jobGraph);
-    }
+    // figure out the partitions for internal streams
+    calculatePartitions(jobGraph);
 
     return jobGraph;
   }
@@ -85,9 +85,9 @@ public class ExecutionPlanner {
     ApplicationConfig appConfig = new ApplicationConfig(config);
     ClusterManagerConfig clusterConfig = new ClusterManagerConfig(config);
     // currently we don't support host-affinity in batch mode
-    if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH
-        && clusterConfig.getHostAffinityEnabled()) {
-      throw new SamzaException("Host affinity is not supported in batch mode. 
Please configure job.host-affinity.enabled=false.");
+    if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && 
clusterConfig.getHostAffinityEnabled()) {
+      throw new SamzaException(String.format("Host affinity is not supported 
in batch mode. Please configure %s=false.",
+          ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED));
     }
   }
 
@@ -96,30 +96,33 @@ public class ExecutionPlanner {
    */
   /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
     JobGraph jobGraph = new JobGraph(config, specGraph);
-    StreamConfig streamConfig = new StreamConfig(config);
+
+    // Source streams contain both input and intermediate streams.
     Set<StreamSpec> sourceStreams = 
getStreamSpecs(specGraph.getInputOperators().keySet(), streamConfig);
+    // Sink streams contain both output and intermediate streams.
     Set<StreamSpec> sinkStreams = 
getStreamSpecs(specGraph.getOutputStreams().keySet(), streamConfig);
-    Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
-    Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet());
-    intStreams.retainAll(sinkStreams);
-    sourceStreams.removeAll(intStreams);
-    sinkStreams.removeAll(intStreams);
+
+    Set<StreamSpec> intermediateStreams = Sets.intersection(sourceStreams, 
sinkStreams);
+    Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, 
intermediateStreams);
+    Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, 
intermediateStreams);
+
+    Set<TableSpec> tables = specGraph.getTables().keySet();
 
     // For this phase, we have a single job node for the whole dag
     String jobName = config.get(JobConfig.JOB_NAME());
     String jobId = config.get(JobConfig.JOB_ID(), "1");
     JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId);
 
-    // add sources
-    sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
+    // Add input streams
+    inputStreams.forEach(spec -> jobGraph.addInputStream(spec, node));
 
-    // add sinks
-    sinkStreams.forEach(spec -> jobGraph.addSink(spec, node));
+    // Add output streams
+    outputStreams.forEach(spec -> jobGraph.addOutputStream(spec, node));
 
-    // add intermediate streams
-    intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, 
node));
+    // Add intermediate streams
+    intermediateStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, 
node, node));
 
-    // add tables
+    // Add tables
     tables.forEach(spec -> jobGraph.addTable(spec, node));
 
     jobGraph.validate();
@@ -132,71 +135,80 @@ public class ExecutionPlanner {
    */
   /* package private */ void calculatePartitions(JobGraph jobGraph) {
     // calculate the partitions for the input streams of join operators
-    calculateJoinInputPartitions(jobGraph, config);
+    calculateJoinInputPartitions(jobGraph, streamConfig);
 
     // calculate the partitions for the rest of intermediate streams
-    calculateIntStreamPartitions(jobGraph, config);
+    calculateIntermediateStreamPartitions(jobGraph, config);
 
     // validate all the partitions are assigned
-    validatePartitions(jobGraph);
+    validateIntermediateStreamPartitions(jobGraph);
   }
 
   /**
-   * Fetch the partitions of source/sink streams and update the StreamEdges.
+   * Fetch the partitions of input/output streams and update the corresponding 
StreamEdges.
    * @param jobGraph {@link JobGraph}
    * @param streamManager the {@link StreamManager} to interface with the 
streams.
    */
-  /* package private */ static void updateExistingPartitions(JobGraph 
jobGraph, StreamManager streamManager) {
+  /* package private */ static void 
fetchInputAndOutputStreamPartitions(JobGraph jobGraph, StreamManager 
streamManager) {
     Set<StreamEdge> existingStreams = new HashSet<>();
-    existingStreams.addAll(jobGraph.getSources());
-    existingStreams.addAll(jobGraph.getSinks());
+    existingStreams.addAll(jobGraph.getInputStreams());
+    existingStreams.addAll(jobGraph.getOutputStreams());
 
+    // System to StreamEdges
     Multimap<String, StreamEdge> systemToStreamEdges = HashMultimap.create();
-    // group the StreamEdge(s) based on the system name
-    existingStreams.forEach(streamEdge -> {
-        SystemStream systemStream = streamEdge.getSystemStream();
-        systemToStreamEdges.put(systemStream.getSystem(), streamEdge);
-      });
-    for (Map.Entry<String, Collection<StreamEdge>> entry : 
systemToStreamEdges.asMap().entrySet()) {
-      String systemName = entry.getKey();
-      Collection<StreamEdge> streamEdges = entry.getValue();
+
+    // Group StreamEdges by system
+    for (StreamEdge streamEdge : existingStreams) {
+      String system = streamEdge.getSystemStream().getSystem();
+      systemToStreamEdges.put(system, streamEdge);
+    }
+
+    // Fetch partition count for every set of StreamEdges belonging to a 
particular system.
+    for (String system : systemToStreamEdges.keySet()) {
+      Collection<StreamEdge> streamEdges = systemToStreamEdges.get(system);
+
+      // Map every stream to its corresponding StreamEdge so we can retrieve a 
StreamEdge given its stream.
       Map<String, StreamEdge> streamToStreamEdge = new HashMap<>();
-      // create the stream name to StreamEdge mapping for this system
-      streamEdges.forEach(streamEdge -> 
streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
-      // retrieve the partition counts for the streams in this system
-      Map<String, Integer> streamToPartitionCount = 
streamManager.getStreamPartitionCounts(systemName, streamToStreamEdge.keySet());
-      // set the partitions of a stream to its StreamEdge
-      streamToPartitionCount.forEach((stream, partitionCount) -> {
-          streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
-          log.info("Partition count is {} for stream {}", partitionCount, 
stream);
-        });
+      for (StreamEdge streamEdge : streamEdges) {
+        streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), 
streamEdge);
+      }
+
+      // Retrieve partition count for every set of streams.
+      Set<String> streams = streamToStreamEdge.keySet();
+      Map<String, Integer> streamToPartitionCount = 
streamManager.getStreamPartitionCounts(system, streams);
+
+      // Retrieve StreamEdge corresponding to every stream and set partition 
count on it.
+      for (Map.Entry<String, Integer> entry : 
streamToPartitionCount.entrySet()) {
+        String stream = entry.getKey();
+        Integer partitionCount = entry.getValue();
+        streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
+        log.info("Fetched partition count value {} for stream {}", 
partitionCount, stream);
+      }
     }
   }
 
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(JobGraph 
jobGraph, Config config) {
+  /* package private */ static void calculateJoinInputPartitions(JobGraph 
jobGraph, StreamConfig streamConfig) {
     // mapping from a source stream to all join specs reachable from it
-    Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = 
HashMultimap.create();
+    Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges = 
HashMultimap.create();
     // reverse mapping of the above
-    Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs = 
HashMultimap.create();
+    Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs = 
HashMultimap.create();
     // A queue of joins with known input partitions
-    Queue<OperatorSpec> joinQ = new LinkedList<>();
+    Queue<JoinOperatorSpec> joinQ = new LinkedList<>();
     // The visited set keeps track of the join specs that have been already 
inserted in the queue before
-    Set<OperatorSpec> visited = new HashSet<>();
+    Set<JoinOperatorSpec> visited = new HashSet<>();
 
-    StreamConfig streamConfig = new StreamConfig(config);
-
-    jobGraph.getSpecGraph().getInputOperators().forEach((key, value) -> {
-        StreamEdge streamEdge = 
jobGraph.getOrCreateStreamEdge(getStreamSpec(key, streamConfig));
+    jobGraph.getSpecGraph().getInputOperators().forEach((streamId, 
inputOperatorSpec) -> {
+        StreamEdge streamEdge = 
jobGraph.getOrCreateStreamEdge(getStreamSpec(streamId, streamConfig));
         // Traverses the StreamGraph to find and update mappings for all Joins 
reachable from this input StreamEdge
-        findReachableJoins(value, streamEdge, joinSpecToStreamEdges, 
streamEdgeToJoinSpecs, joinQ, visited);
+        findReachableJoins(inputOperatorSpec, streamEdge, 
joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
       });
 
     // At this point, joinQ contains joinSpecs where at least one of the input 
stream edge partitions is known.
     while (!joinQ.isEmpty()) {
-      OperatorSpec join = joinQ.poll();
+      JoinOperatorSpec join = joinQ.poll();
       int partitions = StreamEdge.PARTITIONS_UNKNOWN;
       // loop through the input streams to the join and find the partition 
count
       for (StreamEdge edge : joinSpecToStreamEdges.get(join)) {
@@ -223,7 +235,7 @@ public class ExecutionPlanner {
           edge.setPartitionCount(partitions);
 
           // find other joins can be inferred by setting this edge
-          for (OperatorSpec op : streamEdgeToJoinSpecs.get(edge)) {
+          for (JoinOperatorSpec op : streamEdgeToJoinSpecs.get(edge)) {
             if (!visited.contains(op)) {
               joinQ.add(op);
               visited.add(op);
@@ -244,17 +256,19 @@ public class ExecutionPlanner {
    * @param joinQ queue that contains joinSpecs where at least one of the 
input stream edge partitions is known.
    */
   private static void findReachableJoins(OperatorSpec operatorSpec, StreamEdge 
sourceStreamEdge,
-      Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges,
-      Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
-      Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
+      Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges,
+      Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs,
+      Queue<JoinOperatorSpec> joinQ, Set<JoinOperatorSpec> visited) {
+
     if (operatorSpec instanceof JoinOperatorSpec) {
-      joinSpecToStreamEdges.put(operatorSpec, sourceStreamEdge);
-      streamEdgeToJoinSpecs.put(sourceStreamEdge, operatorSpec);
+      JoinOperatorSpec joinOperatorSpec = (JoinOperatorSpec) operatorSpec;
+      joinSpecToStreamEdges.put(joinOperatorSpec, sourceStreamEdge);
+      streamEdgeToJoinSpecs.put(sourceStreamEdge, joinOperatorSpec);
 
-      if (!visited.contains(operatorSpec) && 
sourceStreamEdge.getPartitionCount() > 0) {
+      if (!visited.contains(joinOperatorSpec) && 
sourceStreamEdge.getPartitionCount() > 0) {
         // put the joins with known input partitions into the queue and mark 
as visited
-        joinQ.add(operatorSpec);
-        visited.add(operatorSpec);
+        joinQ.add(joinOperatorSpec);
+        visited.add(joinOperatorSpec);
       }
     }
 
@@ -265,15 +279,16 @@ public class ExecutionPlanner {
     }
   }
 
-  private static void calculateIntStreamPartitions(JobGraph jobGraph, Config 
config) {
-    int partitions = 
config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
StreamEdge.PARTITIONS_UNKNOWN);
-    if (partitions < 0) {
+  private static void calculateIntermediateStreamPartitions(JobGraph jobGraph, 
Config config) {
+    final String defaultPartitionsConfigProperty = 
JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
+    int partitions = config.getInt(defaultPartitionsConfigProperty, 
StreamEdge.PARTITIONS_UNKNOWN);
+    if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
       // use the following simple algo to figure out the partitions
       // partition = MAX(MAX(Input topic partitions), MAX(Output topic 
partitions))
       // partition will be further bounded by MAX_INFERRED_PARTITIONS.
       // This is important when running in hadoop where an HDFS input can have 
lots of files (partitions).
-      int maxInPartitions = maxPartition(jobGraph.getSources());
-      int maxOutPartitions = maxPartition(jobGraph.getSinks());
+      int maxInPartitions = maxPartitions(jobGraph.getInputStreams());
+      int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams());
       partitions = Math.max(maxInPartitions, maxOutPartitions);
 
       if (partitions > MAX_INFERRED_PARTITIONS) {
@@ -281,7 +296,17 @@ public class ExecutionPlanner {
         log.warn(String.format("Inferred intermediate stream partition count 
%d is greater than the max %d. Using the max.",
             partitions, MAX_INFERRED_PARTITIONS));
       }
+    } else {
+      // Reject any zero or other negative values explicitly specified in 
config.
+      if (partitions <= 0) {
+        throw new SamzaException(String.format("Invalid value %d specified for 
config property %s", partitions,
+            defaultPartitionsConfigProperty));
+      }
+
+      log.info("Using partition count value {} specified for config property 
{}", partitions,
+          defaultPartitionsConfigProperty);
     }
+
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
         log.info("Set the partition count for intermediate stream {} to {}.", 
edge.getName(), partitions);
@@ -290,16 +315,15 @@ public class ExecutionPlanner {
     }
   }
 
-  private static void validatePartitions(JobGraph jobGraph) {
+  private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
-        throw new SamzaException(String.format("Failure to assign the 
partitions to Stream %s", edge.getName()));
+        throw new SamzaException(String.format("Failed to assign valid 
partition count to Stream %s", edge.getName()));
       }
     }
   }
 
-  /* package private */ static int maxPartition(Collection<StreamEdge> edges) {
-    return 
edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN);
+  /* package private */ static int maxPartitions(Collection<StreamEdge> edges) 
{
+    return 
edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/02192052/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index f49e6db..5b19095 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -54,8 +54,8 @@ import org.slf4j.LoggerFactory;
 
   private final Map<String, JobNode> nodes = new HashMap<>();
   private final Map<String, StreamEdge> edges = new HashMap<>();
-  private final Set<StreamEdge> sources = new HashSet<>();
-  private final Set<StreamEdge> sinks = new HashSet<>();
+  private final Set<StreamEdge> inputStreams = new HashSet<>();
+  private final Set<StreamEdge> outputStreams = new HashSet<>();
   private final Set<StreamEdge> intermediateStreams = new HashSet<>();
   private final Set<TableSpec> tables = new HashSet<>();
   private final Config config;
@@ -115,26 +115,26 @@ import org.slf4j.LoggerFactory;
 
   /**
    * Add a source stream to a {@link JobNode}
-   * @param input source stream
-   * @param node the job node that consumes from the source
+   * @param streamSpec input stream
+   * @param node the job node that consumes from the streamSpec
    */
-  void addSource(StreamSpec input, JobNode node) {
-    StreamEdge edge = getOrCreateStreamEdge(input);
+  void addInputStream(StreamSpec streamSpec, JobNode node) {
+    StreamEdge edge = getOrCreateStreamEdge(streamSpec);
     edge.addTargetNode(node);
     node.addInEdge(edge);
-    sources.add(edge);
+    inputStreams.add(edge);
   }
 
   /**
-   * Add a sink stream to a {@link JobNode}
-   * @param output sink stream
-   * @param node the job node that outputs to the sink
+   * Add an output stream to a {@link JobNode}
+   * @param streamSpec output stream
+   * @param node the job node that outputs to the output stream
    */
-  void addSink(StreamSpec output, JobNode node) {
-    StreamEdge edge = getOrCreateStreamEdge(output);
+  void addOutputStream(StreamSpec streamSpec, JobNode node) {
+    StreamEdge edge = getOrCreateStreamEdge(streamSpec);
     edge.addSourceNode(node);
     node.addOutEdge(edge);
-    sinks.add(edge);
+    outputStreams.add(edge);
   }
 
   /**
@@ -204,19 +204,19 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
-   * Returns the source streams in the graph
+   * Returns the input streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */
-  Set<StreamEdge> getSources() {
-    return Collections.unmodifiableSet(sources);
+  Set<StreamEdge> getInputStreams() {
+    return Collections.unmodifiableSet(inputStreams);
   }
 
   /**
-   * Return the sink streams in the graph
+   * Return the output streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */
-  Set<StreamEdge> getSinks() {
-    return Collections.unmodifiableSet(sinks);
+  Set<StreamEdge> getOutputStreams() {
+    return Collections.unmodifiableSet(outputStreams);
   }
 
   /**
@@ -236,22 +236,22 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
-   * Validate the graph has the correct topology, meaning the sources are 
coming from external streams,
-   * sinks are going to external streams, and the nodes are connected with 
intermediate streams.
-   * Also validate all the nodes are reachable from the sources.
+   * Validate the graph has the correct topology, meaning the input streams 
are coming from external streams,
+   * output streams are going to external streams, and the nodes are connected 
with intermediate streams.
+   * Also validate all the nodes are reachable from the input streams.
    */
   void validate() {
-    validateSources();
-    validateSinks();
+    validateInputStreams();
+    validateOutputStreams();
     validateInternalStreams();
     validateReachability();
   }
 
   /**
-   * Validate the sources should have indegree being 0 and outdegree greater 
than 0
+   * Validate the input streams should have indegree being 0 and outdegree 
greater than 0
    */
-  private void validateSources() {
-    sources.forEach(edge -> {
+  private void validateInputStreams() {
+    inputStreams.forEach(edge -> {
         if (!edge.getSourceNodes().isEmpty()) {
           throw new IllegalArgumentException(
               String.format("Source stream %s should not have producers.", 
edge.getName()));
@@ -264,10 +264,10 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
-   * Validate the sinks should have outdegree being 0 and indegree greater 
than 0
+   * Validate the output streams should have outdegree being 0 and indegree 
greater than 0
    */
-  private void validateSinks() {
-    sinks.forEach(edge -> {
+  private void validateOutputStreams() {
+    outputStreams.forEach(edge -> {
         if (!edge.getTargetNodes().isEmpty()) {
           throw new IllegalArgumentException(
               String.format("Sink stream %s should not have consumers", 
edge.getName()));
@@ -284,8 +284,8 @@ import org.slf4j.LoggerFactory;
    */
   private void validateInternalStreams() {
     Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
-    internalEdges.removeAll(sources);
-    internalEdges.removeAll(sinks);
+    internalEdges.removeAll(inputStreams);
+    internalEdges.removeAll(outputStreams);
 
     internalEdges.forEach(edge -> {
         if (edge.getSourceNodes().isEmpty() || 
edge.getTargetNodes().isEmpty()) {
@@ -296,10 +296,10 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
-   * Validate all nodes are reachable by sources.
+   * Validate all nodes are reachable by input streams.
    */
   private void validateReachability() {
-    // validate all nodes are reachable from the sources
+    // validate all nodes are reachable from the input streams
     final Set<JobNode> reachable = findReachable();
     if (reachable.size() != nodes.size()) {
       Set<JobNode> unreachable = new HashSet<>(nodes.values());
@@ -317,8 +317,8 @@ import org.slf4j.LoggerFactory;
     Queue<JobNode> queue = new ArrayDeque<>();
     Set<JobNode> visited = new HashSet<>();
 
-    sources.forEach(source -> {
-        List<JobNode> next = source.getTargetNodes();
+    inputStreams.forEach(input -> {
+        List<JobNode> next = input.getTargetNodes();
         queue.addAll(next);
         visited.addAll(next);
       });
@@ -353,11 +353,11 @@ import org.slf4j.LoggerFactory;
     pnodes.forEach(node -> {
         String nid = node.getId();
         //only count the degrees of intermediate streams
-        long degree = node.getInEdges().stream().filter(e -> 
!sources.contains(e)).count();
+        long degree = node.getInEdges().stream().filter(e -> 
!inputStreams.contains(e)).count();
         indegree.put(nid, degree);
 
         if (degree == 0L) {
-          // start from the nodes that has no intermediate input streams, so 
it only consumes from sources
+          // start from the nodes that has no intermediate input streams, so 
it only consumes from input streams
           q.add(node);
           visited.add(node);
         }
@@ -410,9 +410,9 @@ import org.slf4j.LoggerFactory;
           q.add(minNode);
           visited.add(minNode);
         } else {
-          // all the remaining nodes should be reachable from sources
-          // start from sources again to find the next node that hasn't been 
visited
-          JobNode nextNode = sources.stream().flatMap(source -> 
source.getTargetNodes().stream())
+          // all the remaining nodes should be reachable from input streams
+          // start from input streams again to find the next node that hasn't 
been visited
+          JobNode nextNode = inputStreams.stream().flatMap(input -> 
input.getTargetNodes().stream())
               .filter(node -> !visited.contains(node))
               .findAny().get();
           q.add(nextNode);

http://git-wip-us.apache.org/repos/asf/samza/blob/02192052/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 3a8d5c9..91453d2 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -134,8 +134,8 @@ import org.codehaus.jackson.map.ObjectMapper;
     jobGraphJson.sinkStreams = new HashMap<>();
     jobGraphJson.intermediateStreams = new HashMap<>();
     jobGraphJson.tables = new HashMap<>();
-    jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, 
jobGraphJson.sourceStreams));
-    jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, 
jobGraphJson.sinkStreams));
+    jobGraph.getInputStreams().forEach(e -> buildStreamEdgeJson(e, 
jobGraphJson.sourceStreams));
+    jobGraph.getOutputStreams().forEach(e -> buildStreamEdgeJson(e, 
jobGraphJson.sinkStreams));
     jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, 
jobGraphJson.intermediateStreams));
     jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables));
 

http://git-wip-us.apache.org/repos/asf/samza/blob/02192052/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 61cf6c5..c089225 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -28,10 +28,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -72,7 +74,6 @@ public class TestExecutionPlanner {
   private GenericInputDescriptor<KV<Object, Object>> input2Descriptor;
   private StreamSpec input3Spec;
   private GenericInputDescriptor<KV<Object, Object>> input3Descriptor;
-  private StreamSpec input4Spec;
   private GenericInputDescriptor<KV<Object, Object>> input4Descriptor;
   private StreamSpec output1Spec;
   private GenericOutputDescriptor<KV<Object, Object>> output1Descriptor;
@@ -168,44 +169,49 @@ public class TestExecutionPlanner {
   private StreamApplicationDescriptorImpl createStreamGraphWithJoinAndWindow() 
{
 
     return new StreamApplicationDescriptorImpl(appDesc -> {
-        MessageStream<KV<Object, Object>> messageStream1 =
-            appDesc.getInputStream(input1Descriptor)
-                .map(m -> m);
+        MessageStream<KV<Object, Object>> messageStream1 = 
appDesc.getInputStream(input1Descriptor).map(m -> m);
         MessageStream<KV<Object, Object>> messageStream2 =
-            appDesc.getInputStream(input2Descriptor)
-                .partitionBy(m -> m.key, m -> m.value, "p1")
-                .filter(m -> true);
+          appDesc.getInputStream(input2Descriptor).partitionBy(m -> m.key, m 
-> m.value, "p1").filter(m -> true);
         MessageStream<KV<Object, Object>> messageStream3 =
-            appDesc.getInputStream(input3Descriptor)
-                .filter(m -> true)
-                .partitionBy(m -> m.key, m -> m.value, "p2")
-                .map(m -> m);
+          appDesc.getInputStream(input3Descriptor).filter(m -> 
true).partitionBy(m -> m.key, m -> m.value, "p2").map(m -> m);
         OutputStream<KV<Object, Object>> output1 = 
appDesc.getOutputStream(output1Descriptor);
         OutputStream<KV<Object, Object>> output2 = 
appDesc.getOutputStream(output2Descriptor);
 
         messageStream1.map(m -> m)
-            .filter(m->true)
-            .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), 
mock(Serde.class), mock(Serde.class)), "w1");
+          .filter(m -> true)
+          .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), 
mock(Serde.class), mock(Serde.class)), "w1");
 
         messageStream2.map(m -> m)
-            .filter(m->true)
-            .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), 
mock(Serde.class), mock(Serde.class)), "w2");
+          .filter(m -> true)
+          .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), 
mock(Serde.class), mock(Serde.class)), "w2");
+
+        messageStream1.join(messageStream2, (JoinFunction<Object, KV<Object, 
Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+          mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(1600), "j1").sendTo(output1);
+        messageStream3.join(messageStream2, (JoinFunction<Object, KV<Object, 
Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+          mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(100), "j2").sendTo(output2);
+        messageStream3.join(messageStream2, (JoinFunction<Object, KV<Object, 
Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+          mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(252), "j3").sendTo(output2);
+      }, config);
+  }
+
+  private StreamApplicationDescriptorImpl createStreamGraphWithInvalidJoin() {
+    /**
+     *   input1 (64) --
+     *                 |
+     *                join -> output1 (8)
+     *                 |
+     *   input3 (32) --
+     */
+    return new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<KV<Object, Object>> messageStream1 = 
appDesc.getInputStream(input1Descriptor);
+        MessageStream<KV<Object, Object>> messageStream3 = 
appDesc.getInputStream(input3Descriptor);
+        OutputStream<KV<Object, Object>> output1 = 
appDesc.getOutputStream(output1Descriptor);
 
         messageStream1
-            .join(messageStream2,
-                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
-                mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(1600), "j1")
-            .sendTo(output1);
-        messageStream3
-            .join(messageStream2,
-                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
-                mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(100), "j2")
-            .sendTo(output2);
-        messageStream3
-            .join(messageStream2,
-                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
-                mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(252), "j3")
-            .sendTo(output2);
+          .join(messageStream3,
+              (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
+              mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2), "j1")
+          .sendTo(output1);
       }, config);
   }
 
@@ -225,7 +231,6 @@ public class TestExecutionPlanner {
     input1Spec = new StreamSpec("input1", "input1", "system1");
     input2Spec = new StreamSpec("input2", "input2", "system2");
     input3Spec = new StreamSpec("input3", "input3", "system2");
-    input4Spec = new StreamSpec("input4", "input4", "system1");
 
     output1Spec = new StreamSpec("output1", "output1", "system1");
     output2Spec = new StreamSpec("output2", "output2", "system2");
@@ -265,8 +270,8 @@ public class TestExecutionPlanner {
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
 
     JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
-    assertTrue(jobGraph.getSources().size() == 3);
-    assertTrue(jobGraph.getSinks().size() == 2);
+    assertTrue(jobGraph.getInputStreams().size() == 3);
+    assertTrue(jobGraph.getOutputStreams().size() == 2);
     assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams 
generated by partitionBy
   }
 
@@ -276,7 +281,7 @@ public class TestExecutionPlanner {
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
     JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
-    ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+    ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, 
streamManager);
     assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() 
== 64);
     assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() 
== 16);
     assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() 
== 32);
@@ -294,8 +299,8 @@ public class TestExecutionPlanner {
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
     JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
-    ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
-    ExecutionPlanner.calculateJoinInputPartitions(jobGraph, config);
+    ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, 
streamManager);
+    ExecutionPlanner.calculateJoinInputPartitions(jobGraph, new 
StreamConfig(config));
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -303,6 +308,14 @@ public class TestExecutionPlanner {
       });
   }
 
+  @Test(expected = SamzaException.class)
+  public void testRejectsInvalidJoin() {
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    StreamApplicationDescriptorImpl graphSpec = 
createStreamGraphWithInvalidJoin();
+
+    planner.plan(graphSpec.getOperatorSpecGraph());
+  }
+
   @Test
   public void testDefaultPartitions() {
     Map<String, String> map = new HashMap<>(config);
@@ -321,7 +334,7 @@ public class TestExecutionPlanner {
   }
 
   @Test
-  public void testTriggerIntervalForJoins() throws Exception {
+  public void testTriggerIntervalForJoins() {
     Map<String, String> map = new HashMap<>(config);
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
@@ -336,7 +349,7 @@ public class TestExecutionPlanner {
   }
 
   @Test
-  public void testTriggerIntervalForWindowsAndJoins() throws Exception {
+  public void testTriggerIntervalForWindowsAndJoins() {
     Map<String, String> map = new HashMap<>(config);
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
@@ -352,7 +365,7 @@ public class TestExecutionPlanner {
   }
 
   @Test
-  public void testTriggerIntervalWithInvalidWindowMs() throws Exception {
+  public void testTriggerIntervalWithInvalidWindowMs() {
     Map<String, String> map = new HashMap<>(config);
     map.put(TaskConfig.WINDOW_MS(), "-1");
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
@@ -368,9 +381,8 @@ public class TestExecutionPlanner {
     assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
   }
 
-
   @Test
-  public void testTriggerIntervalForStatelessOperators() throws Exception {
+  public void testTriggerIntervalForStatelessOperators() {
     Map<String, String> map = new HashMap<>(config);
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
@@ -384,7 +396,7 @@ public class TestExecutionPlanner {
   }
 
   @Test
-  public void testTriggerIntervalWhenWindowMsIsConfigured() throws Exception {
+  public void testTriggerIntervalWhenWindowMsIsConfigured() {
     Map<String, String> map = new HashMap<>(config);
     map.put(TaskConfig.WINDOW_MS(), "2000");
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
@@ -399,7 +411,7 @@ public class TestExecutionPlanner {
   }
 
   @Test
-  public void testCalculateIntStreamPartitions() throws Exception {
+  public void testCalculateIntStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
     JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
@@ -423,10 +435,10 @@ public class TestExecutionPlanner {
     edge.setPartitionCount(16);
     edges.add(edge);
 
-    assertEquals(32, ExecutionPlanner.maxPartition(edges));
+    assertEquals(32, ExecutionPlanner.maxPartitions(edges));
 
     edges = Collections.emptyList();
-    assertEquals(StreamEdge.PARTITIONS_UNKNOWN, 
ExecutionPlanner.maxPartition(edges));
+    assertEquals(StreamEdge.PARTITIONS_UNKNOWN, 
ExecutionPlanner.maxPartitions(edges));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/02192052/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index 73452d8..ed35d67 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -74,9 +74,9 @@ public class TestJobGraph {
     JobNode n10 = graph1.getOrCreateJobNode("10", "1");
     JobNode n11 = graph1.getOrCreateJobNode("11", "1");
 
-    graph1.addSource(genStream(), n5);
-    graph1.addSource(genStream(), n7);
-    graph1.addSource(genStream(), n3);
+    graph1.addInputStream(genStream(), n5);
+    graph1.addInputStream(genStream(), n7);
+    graph1.addInputStream(genStream(), n3);
     graph1.addIntermediateStream(genStream(), n5, n11);
     graph1.addIntermediateStream(genStream(), n7, n11);
     graph1.addIntermediateStream(genStream(), n7, n8);
@@ -85,9 +85,9 @@ public class TestJobGraph {
     graph1.addIntermediateStream(genStream(), n11, n9);
     graph1.addIntermediateStream(genStream(), n8, n9);
     graph1.addIntermediateStream(genStream(), n11, n10);
-    graph1.addSink(genStream(), n2);
-    graph1.addSink(genStream(), n9);
-    graph1.addSink(genStream(), n10);
+    graph1.addOutputStream(genStream(), n2);
+    graph1.addOutputStream(genStream(), n9);
+    graph1.addOutputStream(genStream(), n10);
   }
 
   /**
@@ -108,7 +108,7 @@ public class TestJobGraph {
     JobNode n6 = graph2.getOrCreateJobNode("6", "1");
     JobNode n7 = graph2.getOrCreateJobNode("7", "1");
 
-    graph2.addSource(genStream(), n1);
+    graph2.addInputStream(genStream(), n1);
     graph2.addIntermediateStream(genStream(), n1, n2);
     graph2.addIntermediateStream(genStream(), n2, n3);
     graph2.addIntermediateStream(genStream(), n3, n4);
@@ -117,7 +117,7 @@ public class TestJobGraph {
     graph2.addIntermediateStream(genStream(), n6, n2);
     graph2.addIntermediateStream(genStream(), n5, n5);
     graph2.addIntermediateStream(genStream(), n5, n7);
-    graph2.addSink(genStream(), n7);
+    graph2.addOutputStream(genStream(), n7);
   }
 
   /**
@@ -132,7 +132,7 @@ public class TestJobGraph {
     JobNode n1 = graph3.getOrCreateJobNode("1", "1");
     JobNode n2 = graph3.getOrCreateJobNode("2", "1");
 
-    graph3.addSource(genStream(), n1);
+    graph3.addInputStream(genStream(), n1);
     graph3.addIntermediateStream(genStream(), n1, n1);
     graph3.addIntermediateStream(genStream(), n1, n2);
     graph3.addIntermediateStream(genStream(), n2, n2);
@@ -149,7 +149,7 @@ public class TestJobGraph {
 
     JobNode n1 = graph4.getOrCreateJobNode("1", "1");
 
-    graph4.addSource(genStream(), n1);
+    graph4.addInputStream(genStream(), n1);
     graph4.addIntermediateStream(genStream(), n1, n1);
   }
 
@@ -180,12 +180,12 @@ public class TestJobGraph {
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
-    graph.addSource(s1, n1);
-    graph.addSource(s2, n1);
-    graph.addSource(s3, n2);
-    graph.addSource(s3, n3);
+    graph.addInputStream(s1, n1);
+    graph.addInputStream(s2, n1);
+    graph.addInputStream(s3, n2);
+    graph.addInputStream(s3, n3);
 
-    assertTrue(graph.getSources().size() == 3);
+    assertTrue(graph.getInputStreams().size() == 3);
 
     assertTrue(graph.getOrCreateJobNode("1", "1").getInEdges().size() == 2);
     assertTrue(graph.getOrCreateJobNode("2", "1").getInEdges().size() == 1);
@@ -214,11 +214,11 @@ public class TestJobGraph {
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
-    graph.addSink(s1, n1);
-    graph.addSink(s2, n2);
-    graph.addSink(s3, n2);
+    graph.addOutputStream(s1, n1);
+    graph.addOutputStream(s2, n2);
+    graph.addOutputStream(s3, n2);
 
-    assertTrue(graph.getSinks().size() == 3);
+    assertTrue(graph.getOutputStreams().size() == 3);
     assertTrue(graph.getOrCreateJobNode("1", "1").getOutEdges().size() == 1);
     assertTrue(graph.getOrCreateJobNode("2", "1").getOutEdges().size() == 2);
 

Reply via email to