Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 8c1f56d6d -> 7340afa6b
Fixes to integrate execution env and api Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7340afa6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7340afa6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7340afa6 Branch: refs/heads/samza-fluent-api-v1 Commit: 7340afa6b5fd3e55b4c72a3010806feae11f5996 Parents: 8c1f56d Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz> Authored: Tue Feb 28 10:15:18 2017 -0800 Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz> Committed: Tue Feb 28 10:15:18 2017 -0800 ---------------------------------------------------------------------- .../apache/samza/system/ExecutionEnvironment.java | 2 +- .../apache/samza/operators/StreamGraphImpl.java | 4 ++-- .../samza/processorgraph/ExecutionPlanner.java | 18 +++++++++++------- .../scala/org/apache/samza/job/JobRunner.scala | 4 ++-- 4 files changed, 16 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java index ef46626..a45c004 100644 --- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java +++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java @@ -32,7 +32,7 @@ import org.apache.samza.config.Config; public interface ExecutionEnvironment extends StreamProvider { String ENVIRONMENT_CONFIG = "job.execution.environment.class"; - String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment"; + String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.AbstractExecutionEnvironment"; /** * Static method to load the local standalone environment http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index 353f455..3e28380 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -208,8 +208,8 @@ public class StreamGraphImpl implements StreamGraph { */ public MessageStreamImpl getInputStream(SystemStream sstream) { for(MessageStream entry: this.inStreams.values()) { - if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() && - ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) { + if (((InputStreamImpl) entry).getSpec().getSystemName().equals(sstream.getSystem()) && + ((InputStreamImpl) entry).getSpec().getPhysicalName().equals(sstream.getStream())) { return (MessageStreamImpl) entry; } } http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java index 055f87c..56df480 100644 --- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java @@ -58,11 +58,13 @@ public class ExecutionPlanner { // create physical processors based on stream graph ProcessorGraph processorGraph = splitStages(streamGraph); - // figure out the partition for internal streams - Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins); + if(!processorGraph.getInternalStreams().isEmpty()) { + // figure out the partition for internal streams + Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins); - // create the streams - createStreams(streams, sysAdmins); + // create the streams + createStreams(streams, sysAdmins); + } return processorGraph; } @@ -75,8 +77,8 @@ public class ExecutionPlanner { ProcessorGraph processorGraph = new ProcessorGraph(config); // TODO: remote the casting once we have the correct types in StreamGraph - Set<StreamSpec> sourceStreams = (Set) streamGraph.getInStreams().keySet(); - Set<StreamSpec> sinkStreams = (Set) streamGraph.getOutStreams().keySet(); + Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet()); + Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet()); Set<StreamSpec> intStreams = new HashSet<>(sourceStreams); intStreams.retainAll(sinkStreams); sourceStreams.removeAll(intStreams); @@ -146,7 +148,9 @@ public class ExecutionPlanner { SystemAdmin systemAdmin = sysAdmins.get(systemName); Map<String, SystemStreamMetadata> metadata = systemAdmin.getSystemStreamMetadata(streamToEdge.keySet()); metadata.forEach((stream, data) -> { - streamToEdge.get(stream).setPartitions(data.getSystemStreamPartitionMetadata().size()); + int partitions = data.getSystemStreamPartitionMetadata().size(); + streamToEdge.get(stream).setPartitions(partitions); + log.info("Partition count is {} for stream {}", partitions, stream); }); } } http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index 61bfafb..1d81b33 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -72,9 +72,9 @@ object JobRunner extends Logging { // start execution env if it's defined val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, "") if (!envClass.isEmpty) { - val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass) + val env: ExecutionEnvironment = ExecutionEnvironment.fromConfig(config) val graphBuilder: StreamGraphBuilder = Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder] - env.run(graphBuilder, config) + env.run(graphBuilder, rewriteConfig(config)) } else { new JobRunner(rewriteConfig(config)).run() }