Repository: samza Updated Branches: refs/heads/master 003ad1068 -> e904e70cb
SAMZA-1860: Modularize Join input validation in ExecutionPlanner This change breaks down the validation of partition counts of input and intermediate streams participating in Join operations into 3 separate steps: 1. Grouping `InputOperatorSpec`s by the `JoinOperatorSpec`s of the Join operations they participate in 2. Replacing `InputOperatorSpec`s with their corresponding `StreamEdge`s 3. Verifying/Inferring partition counts of input/intermediate streams This change covers stream-stream Joins only. Author: Ahmed Abdul Hamid <ahabd...@ahabdulh-mn1.linkedin.biz> Author: Ahmed Abdul Hamid <ahabdulha...@linkedin.com> Reviewers: Jagadish<jagad...@apache.org>, Bharath <bkumarasubraman...@linkedin.com> Closes #637 from ahmedahamid/dev/ahabdulh/modularize-exec-planner Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e904e70c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e904e70c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e904e70c Branch: refs/heads/master Commit: e904e70cb4ad649fe64d6572bb98805812eb1851 Parents: 003ad10 Author: Ahmed Abdul Hamid <ahabd...@ahabdulh-mn1.linkedin.biz> Authored: Tue Sep 25 20:03:14 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Sep 25 20:03:14 2018 -0700 ---------------------------------------------------------------------- .../documentation/versioned/jobs/logging.md | 4 +- docs/startup/download/index.md | 20 +- .../samza/execution/ExecutionPlanner.java | 304 ++++++++++++------- .../execution/OperatorSpecGraphAnalyzer.java | 96 ++++++ .../samza/execution/TestExecutionPlanner.java | 11 +- 5 files changed, 301 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/docs/learn/documentation/versioned/jobs/logging.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md index 8cae27b..ac2d664 100644 --- a/docs/learn/documentation/versioned/jobs/logging.md +++ b/docs/learn/documentation/versioned/jobs/logging.md @@ -27,7 +27,7 @@ The [hello-samza](/startup/hello-samza/{{site.version}}) project shows how to us {% highlight xml %} <dependency> - <groupId>org.slf4j</groupId> + <setId>org.slf4j</setId> <artifactId>slf4j-log4j12</artifactId> <scope>runtime</scope> <version>1.6.2</version> @@ -101,7 +101,7 @@ Sometimes it's desirable to change the Log4J log level from `INFO` to `DEBUG` at {% highlight xml %} <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-log4j</artifactId> <scope>runtime</scope> <version>${samza.version}</version> http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/docs/startup/download/index.md ---------------------------------------------------------------------- diff --git a/docs/startup/download/index.md b/docs/startup/download/index.md index 4fd0898..1f1477d 100644 --- a/docs/startup/download/index.md +++ b/docs/startup/download/index.md @@ -59,18 +59,18 @@ A Maven-based Samza project can pull in all required dependencies Samza dependen {% highlight xml %} <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-api</artifactId> <version>0.14.1</version> </dependency> <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-core_2.11</artifactId> <version>0.14.1</version> <scope>runtime</scope> </dependency> <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-shell</artifactId> <classifier>dist</classifier> <type>tgz</type> @@ -78,31 +78,31 @@ A Maven-based Samza project can pull in all required dependencies Samza dependen <scope>runtime</scope> </dependency> <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-yarn_2.11</artifactId> <version>0.14.1</version> <scope>runtime</scope> </dependency> <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-kv_2.11</artifactId> <version>0.14.1</version> <scope>runtime</scope> </dependency> <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-kv-rocksdb_2.11</artifactId> <version>0.14.1</version> <scope>runtime</scope> </dependency> <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-kv-inmemory_2.11</artifactId> <version>0.14.1</version> <scope>runtime</scope> </dependency> <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-kafka_2.11</artifactId> <version>0.14.1</version> <scope>runtime</scope> @@ -113,7 +113,7 @@ Samza versions less than 0.12 should use artifacts with scala version 2.10 as su {% highlight xml %} <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-yarn_2.10</artifactId> <version>0.11.0</version> </dependency> @@ -123,7 +123,7 @@ Samza versions less than 0.9 should include this additional dependency. {% highlight xml %} <dependency> - <groupId>org.apache.samza</groupId> + <setId>org.apache.samza</setId> <artifactId>samza-serializers_2.10</artifactId> <version>0.8.1</version> </dependency> http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index 810f424..46aef8d 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -22,12 +22,14 @@ package org.apache.samza.execution; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; +import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; import org.apache.samza.SamzaException; import org.apache.samza.config.ApplicationConfig; @@ -36,13 +38,14 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; -import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.samza.execution.ExecutionPlanner.StreamEdgeSet.StreamEdgeSetCategory; import static org.apache.samza.util.StreamUtil.*; @@ -66,17 +69,25 @@ public class ExecutionPlanner { this.streamConfig = new StreamConfig(config); } - public ExecutionPlan plan(OperatorSpecGraph specGraph) { + public ExecutionPlan plan(OperatorSpecGraph opSpecGraph) { validateConfig(); - // create physical job graph based on stream graph - JobGraph jobGraph = createJobGraph(specGraph); + // Create physical job graph based on stream graph + JobGraph jobGraph = createJobGraph(opSpecGraph); - // fetch the external streams partition info - fetchInputAndOutputStreamPartitions(jobGraph, streamManager); + // Fetch the external streams partition info + fetchInputAndOutputStreamPartitions(jobGraph); - // figure out the partitions for internal streams - calculatePartitions(jobGraph); + // Verify agreement in partition count between all joined input/intermediate streams + validateJoinInputStreamPartitions(jobGraph); + + if (!jobGraph.getIntermediateStreamEdges().isEmpty()) { + // Set partition count of intermediate streams not participating in joins + setIntermediateStreamPartitions(jobGraph); + + // Validate partition counts were assigned for all intermediate streams + validateIntermediateStreamPartitions(jobGraph); + } return jobGraph; } @@ -92,21 +103,21 @@ public class ExecutionPlanner { } /** - * Create the physical graph from {@link OperatorSpecGraph} + * Creates the physical graph from {@link OperatorSpecGraph} */ - /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) { - JobGraph jobGraph = new JobGraph(config, specGraph); + /* package private */ JobGraph createJobGraph(OperatorSpecGraph opSpecGraph) { + JobGraph jobGraph = new JobGraph(config, opSpecGraph); // Source streams contain both input and intermediate streams. - Set<StreamSpec> sourceStreams = getStreamSpecs(specGraph.getInputOperators().keySet(), streamConfig); + Set<StreamSpec> sourceStreams = getStreamSpecs(opSpecGraph.getInputOperators().keySet(), streamConfig); // Sink streams contain both output and intermediate streams. - Set<StreamSpec> sinkStreams = getStreamSpecs(specGraph.getOutputStreams().keySet(), streamConfig); + Set<StreamSpec> sinkStreams = getStreamSpecs(opSpecGraph.getOutputStreams().keySet(), streamConfig); Set<StreamSpec> intermediateStreams = Sets.intersection(sourceStreams, sinkStreams); Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, intermediateStreams); Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, intermediateStreams); - Set<TableSpec> tables = specGraph.getTables().keySet(); + Set<TableSpec> tables = opSpecGraph.getTables().keySet(); // For this phase, we have a single job node for the whole dag String jobName = config.get(JobConfig.JOB_NAME()); @@ -131,25 +142,9 @@ public class ExecutionPlanner { } /** - * Figure out the number of partitions of all streams + * Fetches the partitions of input/output streams and update the corresponding StreamEdges. */ - /* package private */ void calculatePartitions(JobGraph jobGraph) { - // calculate the partitions for the input streams of join operators - calculateJoinInputPartitions(jobGraph, streamConfig); - - // calculate the partitions for the rest of intermediate streams - calculateIntermediateStreamPartitions(jobGraph, config); - - // validate all the partitions are assigned - validateIntermediateStreamPartitions(jobGraph); - } - - /** - * Fetch the partitions of input/output streams and update the corresponding StreamEdges. - * @param jobGraph {@link JobGraph} - * @param streamManager the {@link StreamManager} to interface with the streams. - */ - /* package private */ static void fetchInputAndOutputStreamPartitions(JobGraph jobGraph, StreamManager streamManager) { + /* package private */ void fetchInputAndOutputStreamPartitions(JobGraph jobGraph) { Set<StreamEdge> existingStreams = new HashSet<>(); existingStreams.addAll(jobGraph.getInputStreams()); existingStreams.addAll(jobGraph.getOutputStreams()); @@ -188,98 +183,81 @@ public class ExecutionPlanner { } /** - * Calculate the partitions for the input streams of join operators + * Validates agreement in partition count between input/intermediate streams participating in join operations. */ - /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph, StreamConfig streamConfig) { - // mapping from a source stream to all join specs reachable from it - Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create(); - // reverse mapping of the above - Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs = HashMultimap.create(); - // A queue of joins with known input partitions - Queue<JoinOperatorSpec> joinQ = new LinkedList<>(); - // The visited set keeps track of the join specs that have been already inserted in the queue before - Set<JoinOperatorSpec> visited = new HashSet<>(); - - jobGraph.getSpecGraph().getInputOperators().forEach((streamId, inputOperatorSpec) -> { - StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(streamId, streamConfig)); - // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge - findReachableJoins(inputOperatorSpec, streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited); - }); - - // At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known. - while (!joinQ.isEmpty()) { - JoinOperatorSpec join = joinQ.poll(); - int partitions = StreamEdge.PARTITIONS_UNKNOWN; - // loop through the input streams to the join and find the partition count - for (StreamEdge edge : joinSpecToStreamEdges.get(join)) { - int edgePartitions = edge.getPartitionCount(); - if (edgePartitions != StreamEdge.PARTITIONS_UNKNOWN) { - if (partitions == StreamEdge.PARTITIONS_UNKNOWN) { - //if the partition is not assigned - partitions = edgePartitions; - log.info("Inferred the partition count {} for the join operator {} from {}." - , new Object[] {partitions, join.getOpId(), edge.getName()}); - } else if (partitions != edgePartitions) { - throw new SamzaException(String.format( - "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d", - edge.getName(), join.getOpId(), partitions, edgePartitions)); - } - } - } - - // assign the partition count for intermediate streams - for (StreamEdge edge : joinSpecToStreamEdges.get(join)) { - if (edge.getPartitionCount() <= 0) { - log.info("Set the partition count to {} for input stream {} to the join {}.", - new Object[] {partitions, edge.getName(), join.getOpId()}); - edge.setPartitionCount(partitions); - - // find other joins can be inferred by setting this edge - for (JoinOperatorSpec op : streamEdgeToJoinSpecs.get(edge)) { - if (!visited.contains(op)) { - joinQ.add(op); - visited.add(op); - } - } - } - } + private void validateJoinInputStreamPartitions(JobGraph jobGraph) { + // Group input operator specs (input/intermediate streams) by the joins they participate in. + Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs = + OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(jobGraph.getSpecGraph()); + + // Convert every group of input operator specs into a group of corresponding stream edges. + List<StreamEdgeSet> streamEdgeSets = new ArrayList<>(); + for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) { + Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec); + StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph); + streamEdgeSets.add(streamEdgeSet); } + + /* + * Sort the stream edge groups by their category so they appear in this order: + * 1. groups composed exclusively of stream edges with set partition counts + * 2. groups composed of a mix of stream edges with set/unset partition counts + * 3. groups composed exclusively of stream edges with unset partition counts + * + * This guarantees that we process the most constrained stream edge groups first, + * which is crucial for intermediate stream edges that are members of multiple + * stream edge groups. For instance, if we have the following groups of stream + * edges (partition counts in parentheses, question marks for intermediate streams): + * + * a. e1 (16), e2 (16) + * b. e2 (16), e3 (?) + * c. e3 (?), e4 (?) + * + * processing them in the above order (most constrained first) is guaranteed to + * yield correct assignment of partition counts of e3 and e4 in a single scan. + */ + Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> e.getCategory().getSortOrder())); + + // Verify agreement between joined input/intermediate streams. + // This may involve setting partition counts of intermediate stream edges. + streamEdgeSets.forEach(ExecutionPlanner::validateAndAssignStreamEdgeSetPartitions); } /** - * This function traverses the {@link OperatorSpec} graph to find and update mappings for all Joins reachable - * from this input {@link StreamEdge}. - * @param operatorSpec the {@link OperatorSpec} to traverse - * @param sourceStreamEdge source {@link StreamEdge} - * @param joinSpecToStreamEdges mapping from join spec to its source {@link StreamEdge}s - * @param streamEdgeToJoinSpecs mapping from source {@link StreamEdge} to the join specs that consumes it - * @param joinQ queue that contains joinSpecs where at least one of the input stream edge partitions is known. + * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link StreamEdge}s + * correspond to the provided {@code inputOpSpecs}. */ - private static void findReachableJoins(OperatorSpec operatorSpec, StreamEdge sourceStreamEdge, - Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges, - Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs, - Queue<JoinOperatorSpec> joinQ, Set<JoinOperatorSpec> visited) { - - if (operatorSpec instanceof JoinOperatorSpec) { - JoinOperatorSpec joinOperatorSpec = (JoinOperatorSpec) operatorSpec; - joinSpecToStreamEdges.put(joinOperatorSpec, sourceStreamEdge); - streamEdgeToJoinSpecs.put(sourceStreamEdge, joinOperatorSpec); - - if (!visited.contains(joinOperatorSpec) && sourceStreamEdge.getPartitionCount() > 0) { - // put the joins with known input partitions into the queue and mark as visited - joinQ.add(joinOperatorSpec); - visited.add(joinOperatorSpec); + private StreamEdgeSet getStreamEdgeSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs, + JobGraph jobGraph) { + + int countStreamEdgeWithSetPartitions = 0; + Set<StreamEdge> streamEdges = new HashSet<>(); + + for (InputOperatorSpec inputOpSpec : inputOpSpecs) { + StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(inputOpSpec.getStreamId(), streamConfig)); + if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) { + ++countStreamEdgeWithSetPartitions; } + streamEdges.add(streamEdge); } - Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs(); - for (OperatorSpec registeredOpSpec : registeredOperatorSpecs) { - findReachableJoins(registeredOpSpec, sourceStreamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, - visited); + // Determine category of stream group based on stream partition counts. + StreamEdgeSetCategory category; + if (countStreamEdgeWithSetPartitions == 0) { + category = StreamEdgeSetCategory.NO_PARTITION_COUNT_SET; + } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) { + category = StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET; + } else { + category = StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET; } + + return new StreamEdgeSet(setId, streamEdges, category); } - private static void calculateIntermediateStreamPartitions(JobGraph jobGraph, Config config) { + /** + * Sets partition count of intermediate streams which have not been assigned partition counts. + */ + private void setIntermediateStreamPartitions(JobGraph jobGraph) { final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(); int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN); if (partitions == StreamEdge.PARTITIONS_UNKNOWN) { @@ -315,6 +293,9 @@ public class ExecutionPlanner { } } + /** + * Ensures all intermediate streams have been assigned partition counts. + */ private static void validateIntermediateStreamPartitions(JobGraph jobGraph) { for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) { if (edge.getPartitionCount() <= 0) { @@ -323,7 +304,102 @@ public class ExecutionPlanner { } } + /** + * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in partition count. + * This may include setting partition counts of intermediate streams in this set that do not + * have their partition counts set. + */ + private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet streamEdgeSet) { + Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges(); + StreamEdge firstStreamEdgeWithSetPartitions = + streamEdges.stream() + .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) + .findFirst() + .orElse(null); + + // This group consists exclusively of intermediate streams with unknown partition counts. + // We cannot do any validation/computation of partition counts of such streams right here, + // but they are tackled later in the ExecutionPlanner. + if (firstStreamEdgeWithSetPartitions == null) { + return; + } + + // Make sure all other stream edges in this group have the same partition count. + int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount(); + for (StreamEdge streamEdge : streamEdges) { + int streamPartitions = streamEdge.getPartitionCount(); + if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) { + streamEdge.setPartitionCount(partitions); + log.info("Inferred the partition count {} for the join operator {} from {}." + , new Object[] {partitions, streamEdgeSet.getSetId(), firstStreamEdgeWithSetPartitions.getName()}); + } else if (streamPartitions != partitions) { + throw new SamzaException(String.format( + "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d", + streamEdge.getName(), streamEdgeSet.getSetId(), partitions, streamPartitions)); + } + } + } + /* package private */ static int maxPartitions(Collection<StreamEdge> edges) { return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN); } + + /** + * Represents a set of {@link StreamEdge}s. + */ + /* package private */ static class StreamEdgeSet { + + /** + * Indicates whether all stream edges in this group have their partition counts assigned. + */ + public enum StreamEdgeSetCategory { + /** + * All stream edges in this group have their partition counts assigned. + */ + ALL_PARTITION_COUNT_SET(0), + + /** + * Only some stream edges in this group have their partition counts assigned. + */ + SOME_PARTITION_COUNT_SET(1), + + /** + * No stream edge in this group is assigned a partition count. + */ + NO_PARTITION_COUNT_SET(2); + + + private final int sortOrder; + + StreamEdgeSetCategory(int sortOrder) { + this.sortOrder = sortOrder; + } + + public int getSortOrder() { + return sortOrder; + } + } + + private final String setId; + private final Set<StreamEdge> streamEdges; + private final StreamEdgeSetCategory category; + + public StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, StreamEdgeSetCategory category) { + this.setId = setId; + this.streamEdges = streamEdges; + this.category = category; + } + + public Set<StreamEdge> getStreamEdges() { + return streamEdges; + } + + public String getSetId() { + return setId; + } + + public StreamEdgeSetCategory getCategory() { + return category; + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java new file mode 100644 index 0000000..aa1dff9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.execution; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.operators.spec.JoinOperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec; + + +/** + * A utility class that encapsulates the logic for traversing an {@link OperatorSpecGraph} and building + * associations between related {@link OperatorSpec}s. + */ +/* package private */ class OperatorSpecGraphAnalyzer { + + /** + * Returns a grouping of {@link InputOperatorSpec}s by the joins, i.e. {@link JoinOperatorSpec}s, they participate in. + */ + public static Multimap<JoinOperatorSpec, InputOperatorSpec> getJoinToInputOperatorSpecs( + OperatorSpecGraph operatorSpecGraph) { + + Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs = HashMultimap.create(); + + // Traverse graph starting from every input operator spec, observing connectivity between input operator specs + // and Join operator specs. + Iterable<InputOperatorSpec> inputOpSpecs = operatorSpecGraph.getInputOperators().values(); + for (InputOperatorSpec inputOpSpec : inputOpSpecs) { + // Observe all join operator specs reachable from this input operator spec. + JoinOperatorSpecVisitor joinOperatorSpecVisitor = new JoinOperatorSpecVisitor(); + traverse(inputOpSpec, joinOperatorSpecVisitor, opSpec -> opSpec.getRegisteredOperatorSpecs()); + + // Associate every encountered join operator spec with this input operator spec. + for (JoinOperatorSpec joinOpSpec : joinOperatorSpecVisitor.getJoinOperatorSpecs()) { + joinOpSpecToInputOpSpecs.put(joinOpSpec, inputOpSpec); + } + } + + return joinOpSpecToInputOpSpecs; + } + + /** + * Traverses {@link OperatorSpec}s starting from {@code startOpSpec}, invoking {@code visitor} with every encountered + * {@link OperatorSpec}, and using {@code getNextOpSpecs} to determine the set of {@link OperatorSpec}s to visit next. + */ + private static void traverse(OperatorSpec startOpSpec, Consumer<OperatorSpec> visitor, + Function<OperatorSpec, Collection<OperatorSpec>> getNextOpSpecs) { + visitor.accept(startOpSpec); + for (OperatorSpec nextOpSpec : getNextOpSpecs.apply(startOpSpec)) { + traverse(nextOpSpec, visitor, getNextOpSpecs); + } + } + + /** + * An {@link OperatorSpecGraph} visitor that records all {@link JoinOperatorSpec}s encountered in the graph. + */ + private static class JoinOperatorSpecVisitor implements Consumer<OperatorSpec> { + private Set<JoinOperatorSpec> joinOpSpecs = new HashSet<>(); + + @Override + public void accept(OperatorSpec operatorSpec) { + if (operatorSpec instanceof JoinOperatorSpec) { + joinOpSpecs.add((JoinOperatorSpec) operatorSpec); + } + } + + public Set<JoinOperatorSpec> getJoinOperatorSpecs() { + return Collections.unmodifiableSet(joinOpSpecs); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e904e70c/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index ad6b386..779d299 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -33,7 +33,6 @@ import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.StreamConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; @@ -281,7 +280,7 @@ public class TestExecutionPlanner { StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin(); JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); - ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, streamManager); + planner.fetchInputAndOutputStreamPartitions(jobGraph); assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() == 64); assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() == 16); assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() == 32); @@ -297,10 +296,7 @@ public class TestExecutionPlanner { public void testCalculateJoinInputPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin(); - JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); - - ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, streamManager); - ExecutionPlanner.calculateJoinInputPartitions(jobGraph, new StreamConfig(config)); + JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { @@ -324,8 +320,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); StreamApplicationDescriptorImpl graphSpec = createSimpleGraph(); - JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); - planner.calculatePartitions(jobGraph); + JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> {