SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API

This patch contains changes associated with the Standalone StreamProcessor, 
where there is no coordination. This will work for load-balanced consumers like 
new Kafka consumer and statically partitioned cases.

Additionally, we have introduced TaskFactory for StreamTask and AsyncStreamTask.

Author: navina <nav...@apache.org>

Reviewers: xinyuiscool,fredji97

Closes #44 from navina/Noop-JC


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2a3a5ac7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2a3a5ac7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2a3a5ac7

Branch: refs/heads/master
Commit: 2a3a5ac7f21dc4213ff6ec96e11a798bff096d04
Parents: 154cda2
Author: navina <nav...@apache.org>
Authored: Mon Jan 30 18:30:02 2017 -0800
Committer: navina <nav...@apache.org>
Committed: Mon Jan 30 18:30:02 2017 -0800

----------------------------------------------------------------------
 NOTICE                                          |   1 +
 .../samza/task/AsyncStreamTaskFactory.java      |  28 ++
 .../apache/samza/task/StreamTaskFactory.java    |  31 ++
 .../samza/config/JobCoordinatorConfig.java      |  40 ++
 .../org/apache/samza/config/TaskConfigJava.java |  25 ++
 .../apache/samza/container/RunLoopFactory.java  |  20 +-
 .../AllSspToSingleTaskGrouperFactory.java       |  69 ++++
 .../task/SingleContainerGrouperFactory.java     |  56 +++
 .../samza/coordinator/JobCoordinator.java       |  72 ++++
 .../coordinator/JobCoordinatorFactory.java      |  35 ++
 .../processor/SamzaContainerController.java     | 152 ++++++++
 .../apache/samza/processor/StreamProcessor.java | 173 +++++++++
 .../standalone/StandaloneJobCoordinator.java    | 148 ++++++++
 .../StandaloneJobCoordinatorFactory.java        |  31 ++
 .../org/apache/samza/task/AsyncRunLoop.java     |  37 +-
 .../org/apache/samza/config/JobConfig.scala     |   3 +
 .../org/apache/samza/container/RunLoop.scala    |  18 +-
 .../apache/samza/container/SamzaContainer.scala | 178 ++++++---
 .../apache/samza/container/TaskInstance.scala   |  21 +-
 .../samza/coordinator/JobCoordinator.scala      | 360 ------------------
 .../samza/coordinator/JobModelManager.scala     | 362 +++++++++++++++++++
 .../samza/job/local/ThreadJobFactory.scala      |  23 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java |  20 +-
 .../apache/samza/container/TestRunLoop.scala    |  15 +-
 .../samza/container/TestSamzaContainer.scala    |   6 +-
 .../samza/container/TestTaskInstance.scala      |   6 +-
 .../test/StandaloneIntegrationTestHarness.java  |  84 +++++
 .../apache/samza/test/StandaloneTestUtils.java  | 111 ++++++
 .../test/processor/IdentityStreamTask.java      |  72 ++++
 .../test/processor/TestStreamProcessor.java     | 226 ++++++++++++
 .../AbstractIntegrationTestHarness.scala        |  60 +++
 .../AbstractKafkaServerTestHarness.scala        | 123 +++++++
 .../harness/AbstractZookeeperTestHarness.scala  |  72 ++++
 33 files changed, 2185 insertions(+), 493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 3352dda..2ee5fdc 100644
--- a/NOTICE
+++ b/NOTICE
@@ -3,3 +3,4 @@ Copyright 2014 The Apache Software Foundation
 
 This product includes software developed at The Apache Software
 Foundation (http://www.apache.org/).
+

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java 
b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
new file mode 100644
index 0000000..e5ce9c4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * Build {@link AsyncStreamTask} instances.
+ * Implementations should return a new instance for each {@link 
#createInstance()} invocation.
+ */
+public interface AsyncStreamTaskFactory {
+  AsyncStreamTask createInstance();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java 
b/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java
new file mode 100644
index 0000000..52adef6
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Build {@link StreamTask} instances.
+ * Implementations should return a new instance for each {@link 
#createInstance()} invocation.
+ */
+@InterfaceStability.Stable
+public interface StreamTaskFactory {
+  StreamTask createInstance();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
new file mode 100644
index 0000000..946a308
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import com.google.common.base.Strings;
+
+public class JobCoordinatorConfig extends MapConfig {
+  public static final String JOB_COORDINATOR_FACTORY = 
"job-coordinator.factory";
+
+  public JobCoordinatorConfig(Config config) {
+    super(config);
+  }
+
+  public String getJobCoordinatorFactoryClassName() {
+    String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
+    if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
+      throw new ConfigException(
+          String.format("Missing config - %s. Cannot start StreamProcessor!", 
JOB_COORDINATOR_FACTORY));
+    }
+
+    return jobCoordinatorFactoryClassName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 648fe58..e6db74a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -35,6 +35,10 @@ import scala.collection.JavaConversions;
 
 
 public class TaskConfigJava extends MapConfig {
+  // Task Configs
+  private static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
+  public static final long DEFAULT_TASK_SHUTDOWN_MS = 5000L;
+
   // broadcast streams consumed by all tasks. e.g. kafka.foo#1
   public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
   private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
@@ -117,4 +121,25 @@ public class TaskConfigJava extends MapConfig {
 
     return Collections.unmodifiableSet(allInputSS);
   }
+
+  /**
+   * Returns a value indicating how long to wait for the tasks to shutdown
+   * If the value is not defined in the config or if does not parse correctly, 
we return the default value -
+   * {@value #DEFAULT_TASK_SHUTDOWN_MS}
+   *
+   * @return Long value indicating how long to wait for all the tasks to 
shutdown
+   */
+  public long getShutdownMs() {
+    String shutdownMs = get(TASK_SHUTDOWN_MS);
+    try {
+      return Long.parseLong(shutdownMs);
+    } catch (NumberFormatException nfe) {
+      LOGGER.warn(String.format(
+          "Unable to parse user-configure value for %s - %s. Using default 
value %d",
+          TASK_SHUTDOWN_MS,
+          shutdownMs,
+          DEFAULT_TASK_SHUTDOWN_MS));
+      return DEFAULT_TASK_SHUTDOWN_MS;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 23a68cb..32ab47a 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -19,20 +19,19 @@
 
 package org.apache.samza.container;
 
-import java.util.concurrent.ExecutorService;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.TaskConfig;
-import org.apache.samza.util.HighResolutionClock;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.task.AsyncRunLoop;
-import org.apache.samza.task.AsyncStreamTask;
-import org.apache.samza.task.StreamTask;
+import org.apache.samza.util.HighResolutionClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
 import scala.runtime.AbstractFunction0;
 import scala.runtime.AbstractFunction1;
 
+import java.util.concurrent.ExecutorService;
+
 import static org.apache.samza.util.Util.asScalaClock;
 
 /**
@@ -46,7 +45,7 @@ public class RunLoopFactory {
   private static final long DEFAULT_COMMIT_MS = 60000L;
   private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
 
-  public static Runnable 
createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance<?>> 
taskInstances,
+  public static Runnable 
createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance> 
taskInstances,
       SystemConsumers consumerMultiplexer,
       ExecutorService threadPool,
       long maxThrottlingDelayMs,
@@ -62,9 +61,9 @@ public class RunLoopFactory {
 
     log.info("Got commit milliseconds: " + taskCommitMs);
 
-    int asyncTaskCount = taskInstances.values().count(new 
AbstractFunction1<TaskInstance<?>, Object>() {
+    int asyncTaskCount = taskInstances.values().count(new 
AbstractFunction1<TaskInstance, Object>() {
       @Override
-      public Boolean apply(TaskInstance<?> t) {
+      public Boolean apply(TaskInstance t) {
         return t.isAsyncTask();
       }
     });
@@ -77,9 +76,8 @@ public class RunLoopFactory {
     if (asyncTaskCount == 0) {
       log.info("Run loop in single thread mode.");
 
-      scala.collection.immutable.Map<TaskName, TaskInstance<StreamTask>> 
streamTaskInstances = (scala.collection.immutable.Map) taskInstances;
       return new RunLoop(
-        streamTaskInstances,
+        taskInstances,
         consumerMultiplexer,
         containerMetrics,
         maxThrottlingDelayMs,
@@ -95,12 +93,10 @@ public class RunLoopFactory {
 
       log.info("Got callback timeout: " + callbackTimeout);
 
-      scala.collection.immutable.Map<TaskName, TaskInstance<AsyncStreamTask>> 
asyncStreamTaskInstances = (scala.collection.immutable.Map) taskInstances;
-
       log.info("Run loop in asynchronous mode.");
 
       return new AsyncRunLoop(
-        JavaConversions.mapAsJavaMap(asyncStreamTaskInstances),
+        JavaConversions.mapAsJavaMap(taskInstances),
         threadPool,
         consumerMultiplexer,
         taskMaxConcurrency,

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
new file mode 100644
index 0000000..2d22977
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * AllSspToSingleTaskGrouper, as the name suggests, assigns all partitions to 
be consumed by a single TaskInstance
+ * This is useful, in case of using load-balanced consumers like the new Kafka 
consumer, Samza doesn't control the
+ * partitions being consumed by a task. Hence, it is assumed that there is 
only 1 task that processes all messages,
+ * irrespective of which partition it belongs to.
+ * This also implies that container and tasks are synonymous when this grouper 
is used. Taskname(s) has to be globally
+ * unique within a given job.
+ *
+ * Note: This grouper does not take in broadcast streams yet.
+ */
+class AllSspToSingleTaskGrouper implements SystemStreamPartitionGrouper {
+  private final int containerId;
+
+  public AllSspToSingleTaskGrouper(int containerId) {
+    this.containerId = containerId;
+  }
+
+  @Override
+  public Map<TaskName, Set<SystemStreamPartition>> group(final 
Set<SystemStreamPartition> ssps) {
+    if (ssps == null) {
+      throw new SamzaException("ssp set cannot be null!");
+    }
+    if (ssps.size() == 0) {
+      throw new SamzaException("Cannot process stream task with no input 
system stream partitions");
+    }
+
+    final TaskName taskName = new TaskName(String.format("Task-%s", 
String.valueOf(containerId)));
+
+    return Collections.singletonMap(taskName, ssps);
+  }
+}
+
+public class AllSspToSingleTaskGrouperFactory implements 
SystemStreamPartitionGrouperFactory {
+  @Override
+  public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config 
config) {
+    return new 
AllSspToSingleTaskGrouper(config.getInt(JobConfig.PROCESSOR_ID()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
new file mode 100644
index 0000000..980f2a9
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class SingleContainerGrouperFactory implements TaskNameGrouperFactory {
+  @Override
+  public TaskNameGrouper build(Config config) {
+    return new SingleContainerGrouper(config.getInt(JobConfig.PROCESSOR_ID()));
+  }
+}
+
+class SingleContainerGrouper implements TaskNameGrouper {
+  private final int containerId;
+
+  SingleContainerGrouper(int containerId) {
+    this.containerId = containerId;
+  }
+
+  @Override
+  public Set<ContainerModel> group(Set<TaskModel> taskModels) {
+    Map<TaskName, TaskModel> taskNameTaskModelMap = new HashMap<>();
+    for (TaskModel taskModel: taskModels) {
+      taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel);
+    }
+    ContainerModel containerModel = new ContainerModel(containerId, 
taskNameTaskModelMap);
+    return Collections.singleton(containerModel);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
new file mode 100644
index 0000000..252e56b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.job.model.JobModel;
+
+/**
+ *  A JobCoordinator is a pluggable module in each process that provides the 
JobModel and the ID to the StreamProcessor.
+ *  In some cases, ID assignment is completely config driven, while in other 
cases, ID assignment may require
+ *  coordination with JobCoordinators of other StreamProcessors.
+ *  */
+@InterfaceStability.Evolving
+public interface JobCoordinator {
+  /**
+   * Starts the JobCoordinator which involves one or more of the following:
+   * * LeaderElector Module initialization, if any
+   * * If leader, generate JobModel. Else, read JobModel
+   */
+  void start();
+
+  /**
+   * Cleanly shutting down the JobCoordinator involves:
+   * * Shutting down the Container
+   * * Shutting down the LeaderElection module (TBD: details depending on 
leader or not)
+   */
+  void stop();
+
+  /**
+   * Waits for a specified amount of time for the JobCoordinator to fully 
start-up, which means it should be ready to
+   * process messages.
+   * In a Standalone use-case, it may be sufficient to wait for the container 
to start-up.
+   * In a ZK based Standalone use-case, it also includes registration with ZK, 
initialization of the
+   * leader elector module, container start-up etc.
+   *
+   * @param timeoutMs Maximum time to wait, in milliseconds
+   * @return {@code true}, if the JobCoordinator is started within the 
specified wait time and {@code false} if the
+   * waiting time elapsed
+   * @throws InterruptedException if the current thread is interrupted while 
waiting for the JobCoordinator to start-up
+   */
+  boolean awaitStart(long timeoutMs) throws InterruptedException;
+  /**
+   * Returns the logical ID assigned to the processor
+   * It is up to the user to ensure that different instances of 
StreamProcessor within a job have unique processor ID.
+   * @return integer representing the logical processor ID
+   */
+  int getProcessorId();
+
+  /**
+   * Returns the current JobModel
+   * The implementation of the JobCoordinator in the leader needs to know how 
to read the config and generate JobModel
+   * In case of a non-leader, the JobCoordinator should simply fetch the 
jobmodel
+   * @return instance of JobModel that describes the partition distribution 
among the processors (and hence, tasks)
+   */
+  JobModel getJobModel();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
new file mode 100644
index 0000000..3da70e0
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.processor.SamzaContainerController;
+
+@InterfaceStability.Evolving
+public interface JobCoordinatorFactory {
+  /**
+   * @param processorId Unique identifier for the processor
+   * @param config Configs relevant for the JobCoordinator TODO: Separate JC 
related configs into a "JobCoordinatorConfig"
+   * @param containerController Controller interface for starting and stopping 
container. In future, it may simply
+   *                            pause the container and add/remove tasks
+   * @return An instance of IJobCoordinator
+   */
+  JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
 
b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
new file mode 100644
index 0000000..d448d30
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.JmxServer;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class SamzaContainerController {
+  private static final Logger log = 
LoggerFactory.getLogger(SamzaContainerController.class);
+
+  private final ExecutorService executorService;
+  private volatile SamzaContainer container;
+  private final Map<String, MetricsReporter> metricsReporterMap;
+  private final Object taskFactory;
+  private final long containerShutdownMs;
+
+  // Internal Member Variables
+  private Future containerFuture;
+
+  /**
+   * Creates an instance of a controller for instantiating, starting and/or 
stopping {@link SamzaContainer}
+   * Requests to execute a container are submitted to the {@link 
ExecutorService}
+   *
+   * @param taskFactory         Factory that be used create instances of 
{@link org.apache.samza.task.StreamTask} or
+   *                            {@link org.apache.samza.task.AsyncStreamTask}
+   * @param containerShutdownMs How long the Samza container should wait for 
an orderly shutdown of task instances
+   * @param metricsReporterMap  Map of metric reporter name and {@link 
MetricsReporter} instance
+   */
+  public SamzaContainerController(
+      Object taskFactory,
+      long containerShutdownMs,
+      String processorId,
+      Map<String, MetricsReporter> metricsReporterMap) {
+    this.executorService = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder()
+        .setNameFormat("p" + processorId + "-container-thread-%d").build());
+    this.taskFactory = taskFactory;
+    this.metricsReporterMap = metricsReporterMap;
+    if (containerShutdownMs == -1) {
+      this.containerShutdownMs = TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS;
+    } else {
+      this.containerShutdownMs = containerShutdownMs;
+    }
+  }
+
+  /**
+   * Instantiates a container and submits to the executor. This method does 
not actually wait for the container to
+   * fully start-up. For such a behavior, see {@link #awaitStart(long)}
+   * <p>
+   * <b>Note:</b> <i>This method does not stop a currently running container, 
if any. It is left up to the caller to
+   * ensure that the container has been stopped with stopContainer before 
invoking this method.</i>
+   *
+   * @param containerModel               {@link ContainerModel} instance to 
use for the current run of the Container
+   * @param config                       Complete configuration map used by 
the Samza job
+   * @param maxChangelogStreamPartitions Max number of partitions expected in 
the changelog streams
+   *                                     TODO: Try to get rid of 
maxChangelogStreamPartitions from method arguments
+   */
+  public void startContainer(ContainerModel containerModel, Config config, int 
maxChangelogStreamPartitions) {
+    LocalityManager localityManager = null;
+    if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
+      localityManager = 
SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), 
config);
+    }
+    log.info("About to create container: " + containerModel.getContainerId());
+    container = SamzaContainer$.MODULE$.apply(
+        containerModel.getContainerId(),
+        containerModel,
+        config,
+        maxChangelogStreamPartitions,
+        localityManager,
+        new JmxServer(),
+        Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
+        taskFactory);
+    log.info("About to start container: " + containerModel.getContainerId());
+    containerFuture = executorService.submit(() -> container.run());
+  }
+
+  /**
+   * Method waits for a specified amount of time for the container to fully 
start-up, which consists of class-loading
+   * all the components and start message processing
+   *
+   * @param timeoutMs Maximum time to wait, in milliseconds
+   * @return {@code true}, if the container started within the specified wait 
time and {@code false} if the waiting
+   * time elapsed
+   * @throws InterruptedException if the current thread is interrupted while 
waiting for container to start-up
+   */
+  public boolean awaitStart(long timeoutMs) throws InterruptedException {
+    return container.awaitStart(timeoutMs);
+  }
+
+  /**
+   * Stops a running container, if any. Invoking this method multiple times 
does not have any side-effects.
+   */
+  public void stopContainer() {
+    if (container == null) {
+      log.warn("Shutdown before a container was created.");
+      return;
+    }
+
+    container.shutdown();
+    try {
+      if (containerFuture != null)
+        containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException | ExecutionException e) {
+      log.error("Ran into problems while trying to stop the container in the 
processor!", e);
+    } catch (TimeoutException e) {
+      log.warn("Got Timeout Exception while trying to stop the container in 
the processor! The processor may not shutdown properly", e);
+    }
+  }
+
+  /**
+   * Shutsdown the controller by first stop any running container and then, 
shutting down the {@link ExecutorService}
+   */
+  public void shutdown() {
+    stopContainer();
+    executorService.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
new file mode 100644
index 0000000..5e90c56
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.processor;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.task.AsyncStreamTaskFactory;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * StreamProcessor can be embedded in any application or executed in a 
distributed environment (aka cluster) as an
+ * independent process.
+ * <p>
+ * <b>Usage Example:</b>
+ * <pre>
+ * StreamProcessor processor = new StreamProcessor(1, config);
+ * processor.start();
+ * try {
+ *  boolean status = processor.awaitStart(TIMEOUT_MS);    // Optional - 
blocking call
+ *  if (!status) {
+ *    // Timed out
+ *  }
+ *  ...
+ * } catch (InterruptedException ie) {
+ *   ...
+ * } finally {
+ *   processor.stop();
+ * }
+ * </pre>
+ * Note: A single JVM can create multiple StreamProcessor instances. It is 
safe to create StreamProcessor instances in
+ * multiple threads.
+ */
+@InterfaceStability.Evolving
+public class StreamProcessor {
+  private static final Logger log = 
LoggerFactory.getLogger(StreamProcessor.class);
+  /**
+   * processor.id is equivalent to containerId in samza. It is a logical 
identifier used by Samza for a processor.
+   * In a distributed environment, this logical identifier is mapped to a 
physical identifier of the resource. For
+   * example, Yarn provides a "containerId" for every resource it allocates.
+   * In an embedded environment, this identifier is provided by the user by 
directly using the StreamProcessor API.
+   * <p>
+   * <b>Note:</b>This identifier has to be unique across the instances of 
StreamProcessors.
+   */
+  private static final String PROCESSOR_ID = "processor.id";
+  private final int processorId;
+  private final JobCoordinator jobCoordinator;
+
+  /**
+   * Create an instance of StreamProcessor that encapsulates a JobCoordinator 
and Samza Container
+   * <p>
+   * JobCoordinator controls how the various StreamProcessor instances 
belonging to a job coordinate. It is also
+   * responsible generating and updating JobModel.
+   * When StreamProcessor starts, it starts the JobCoordinator and brings up a 
SamzaContainer based on the JobModel.
+   * SamzaContainer is executed using an ExecutorService.
+   * <p>
+   * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the 
StreamProcessor, and NOT exposed to the user
+   *
+   * @param processorId            Unique identifier for a processor within 
the job. It has the same semantics as
+   *                               "containerId" in Samza
+   * @param config                 Instance of config object - contains all 
configuration required for processing
+   * @param customMetricsReporters Map of custom MetricReporter instances that 
are to be injected in the Samza job
+   * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be 
used for creating task instances.
+   */
+  public StreamProcessor(int processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters,
+                         AsyncStreamTaskFactory asyncStreamTaskFactory) {
+    this(processorId, config, customMetricsReporters, (Object) 
asyncStreamTaskFactory);
+  }
+
+  /**
+   * Same as {@link #StreamProcessor(int, Config, Map, 
AsyncStreamTaskFactory)}, except task instances are created
+   * using the provided {@link StreamTaskFactory}.
+   */
+  public StreamProcessor(int processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters,
+                         StreamTaskFactory streamTaskFactory) {
+    this(processorId, config, customMetricsReporters, (Object) 
streamTaskFactory);
+  }
+
+  /**
+   * Same as {@link #StreamProcessor(int, Config, Map, 
AsyncStreamTaskFactory)}, except task instances are created
+   * using the "task.class" configuration instead of a task factory.
+   */
+  public StreamProcessor(int processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters) {
+    this(processorId, config, customMetricsReporters, (Object) null);
+  }
+
+  private StreamProcessor(int processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters,
+                          Object taskFactory) {
+    this.processorId = processorId;
+
+    Map<String, String> updatedConfigMap = new HashMap<>();
+    updatedConfigMap.putAll(config);
+    updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId));
+    Config updatedConfig = new MapConfig(updatedConfigMap);
+
+
+    SamzaContainerController containerController = new 
SamzaContainerController(
+        taskFactory,
+        new TaskConfigJava(updatedConfig).getShutdownMs(),
+        String.valueOf(processorId),
+        customMetricsReporters);
+
+    this.jobCoordinator = Util.
+        <JobCoordinatorFactory>getObj(
+            new JobCoordinatorConfig(updatedConfig)
+                .getJobCoordinatorFactoryClassName())
+        .getJobCoordinator(processorId, updatedConfig, containerController);
+  }
+
+  /**
+   * StreamProcessor Lifecycle: start()
+   * <ul>
+   * <li>Starts the JobCoordinator and fetches the JobModel</li>
+   * <li>jobCoordinator.start returns after starting the container using 
ContainerModel </li>
+   * </ul>
+   * When start() returns, it only guarantees that the container is 
initialized and submitted by the controller to
+   * execute
+   */
+  public void start() {
+    jobCoordinator.start();
+  }
+
+  /**
+   * Method that allows the user to wait for a specified amount of time for 
the container to initialize and start
+   * processing messages
+   *
+   * @param timeoutMs Maximum time to wait, in milliseconds
+   * @return {@code true}, if the container started within the specified wait 
time and {@code false} if the waiting time
+   * elapsed
+   * @throws InterruptedException if the current thread is interrupted while 
waiting for container to start-up
+   */
+  public boolean awaitStart(long timeoutMs) throws InterruptedException {
+    return jobCoordinator.awaitStart(timeoutMs);
+  }
+
+  /**
+   * StreamProcessor Lifecycle: stop()
+   * <ul>
+   * <li>Stops the SamzaContainer execution</li>
+   * <li>Stops the JobCoordinator</li>
+   * </ul>
+   */
+  public void stop() {
+    jobCoordinator.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
new file mode 100644
index 0000000..1401725
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.standalone;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelManager$;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Standalone Job Coordinator does not implement any leader elector module or 
cluster manager
+ *
+ * It generates the JobModel using the Config passed into the constructor.
+ *
+ * Since the standalone JobCoordinator does not perform partition management, 
it allows two kinds of partition
+ * distribution mechanism:
+ * <ul>
+ *   <li>
+ *     Consumer-managed Partition Distribution - For example, using the kafka 
consumer which also handles partition
+ *   load balancing across its consumers. In such a case, all input 
SystemStreamPartition(s) can be grouped to the same
+ *   task instance using {@link 
org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory} and 
the
+ *   task can be added to a single container using
+ *   {@link 
org.apache.samza.container.grouper.task.SingleContainerGrouperFactory}.
+ *   </li>
+ *   <li>
+ *     User-defined Fixed Partition Distribution - For example, the 
application may always run a fixed number of
+ *   processors and use a static distribution of partitions that doesn't 
change. This can be achieved by adding custom
+ *   {@link 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper} and
+ *   {@link org.apache.samza.container.grouper.task.TaskNameGrouper}.
+ *   </li>
+ * </ul>
+ * */
+public class StandaloneJobCoordinator implements JobCoordinator {
+  private static final Logger log = 
LoggerFactory.getLogger(StandaloneJobCoordinator.class);
+  private final int processorId;
+  private final Config config;
+  private final JobModelManager jobModelManager;
+  private final SamzaContainerController containerController;
+
+  @VisibleForTesting
+  StandaloneJobCoordinator(
+      int processorId,
+      Config config,
+      SamzaContainerController containerController,
+      JobModelManager jobModelManager) {
+    this.processorId = processorId;
+    this.config = config;
+    this.containerController = containerController;
+    this.jobModelManager = jobModelManager;
+  }
+
+  public StandaloneJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController) {
+    this.processorId = processorId;
+    this.config = config;
+    this.containerController = containerController;
+
+    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    for (String systemName: systemConfig.getSystemNames()) {
+      String systemFactoryClassName = 
systemConfig.getSystemFactory(systemName);
+      if (systemFactoryClassName == null) {
+        log.error(String.format("A stream uses system %s, which is missing 
from the configuration.", systemName));
+        throw new SamzaException(String.format("A stream uses system %s, which 
is missing from the configuration.", systemName));
+      }
+      SystemFactory systemFactory = 
Util.<SystemFactory>getObj(systemFactoryClassName);
+      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, 
this.config));
+    }
+
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 
5000, SystemClock.instance());
+
+    /** TODO:
+     * Locality Manager seems to be required in JC for reading locality info 
and grouping tasks intelligently and also,
+     * in SamzaContainer for writing locality info to the coordinator stream. 
This closely couples together
+     * TaskNameGrouper with the LocalityManager! Hence, groupers should be a 
property of the jobcoordinator
+     * (job.coordinator.task.grouper, instead of 
task.systemstreampartition.grouper)
+     */
+    this.jobModelManager = 
JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, 
streamMetadataCache, null);
+  }
+
+  @Override
+  public void start() {
+    // No-op
+    JobModel jobModel = getJobModel();
+    containerController.startContainer(
+        jobModel.getContainers().get(processorId),
+        jobModel.getConfig(),
+        jobModel.maxChangeLogStreamPartitions);
+  }
+
+  @Override
+  public void stop() {
+    // No-op
+    containerController.shutdown();
+  }
+
+  /**
+   * Waits for a specified amount of time for the JobCoordinator to fully 
start-up, which means it should be ready to
+   * process messages. In a Standalone use-case, it may be sufficient to wait 
for the container to start-up. In case of
+   * ZK based Standalone use-case, it also includes registration with ZK, the 
initialization of leader elector module etc.
+   *
+   * @param timeoutMs Maximum time to wait, in milliseconds
+   */
+  @Override
+  public boolean awaitStart(long timeoutMs) throws InterruptedException {
+    return containerController.awaitStart(timeoutMs);
+  }
+
+  @Override
+  public int getProcessorId() {
+    return this.processorId;
+  }
+
+  @Override
+  public JobModel getJobModel() {
+    return jobModelManager.jobModel();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
new file mode 100644
index 0000000..7ca85c0
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.standalone;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.processor.SamzaContainerController;
+
+public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory 
{
+  @Override
+  public JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController) {
+    return new StandaloneJobCoordinator(processorId, config, 
containerController);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 5c5ea65..064402c 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -75,7 +75,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   private volatile Throwable throwable = null;
   private final HighResolutionClock clock;
 
-  public AsyncRunLoop(Map<TaskName, TaskInstance<AsyncStreamTask>> 
taskInstances,
+  public AsyncRunLoop(Map<TaskName, TaskInstance> taskInstances,
       ExecutorService threadPool,
       SystemConsumers consumerMultiplexer,
       int maxConcurrency,
@@ -100,7 +100,7 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
     this.workerTimer = Executors.newSingleThreadScheduledExecutor();
     this.clock = clock;
     Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
-    for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) {
+    for (TaskInstance task : taskInstances.values()) {
       workers.put(task.taskName(), new AsyncTaskWorker(task));
     }
     // Partions and tasks assigned to the container will not change during the 
run loop life time
@@ -112,14 +112,12 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
    * Returns mapping of the SystemStreamPartition to the AsyncTaskWorkers to 
efficiently route the envelopes
    */
   private static Map<SystemStreamPartition, List<AsyncTaskWorker>> 
getSspToAsyncTaskWorkerMap(
-      Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances, 
Map<TaskName, AsyncTaskWorker> taskWorkers) {
+      Map<TaskName, TaskInstance> taskInstances, Map<TaskName, 
AsyncTaskWorker> taskWorkers) {
     Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToWorkerMap = new 
HashMap<>();
-    for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) {
+    for (TaskInstance task : taskInstances.values()) {
       Set<SystemStreamPartition> ssps = 
JavaConversions.setAsJavaSet(task.systemStreamPartitions());
       for (SystemStreamPartition ssp : ssps) {
-        if (sspToWorkerMap.get(ssp) == null) {
-          sspToWorkerMap.put(ssp, new ArrayList<AsyncTaskWorker>());
-        }
+        sspToWorkerMap.putIfAbsent(ssp, new ArrayList<>());
         sspToWorkerMap.get(ssp).add(taskWorkers.get(task.taskName()));
       }
     }
@@ -202,7 +200,8 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
   private IncomingMessageEnvelope chooseEnvelope() {
     IncomingMessageEnvelope envelope = consumerMultiplexer.choose(false);
     if (envelope != null) {
-      log.trace("Choose envelope ssp {} offset {} for processing", 
envelope.getSystemStreamPartition(), envelope.getOffset());
+      log.trace("Choose envelope ssp {} offset {} for processing",
+          envelope.getSystemStreamPartition(), envelope.getOffset());
       containerMetrics.envelopes().inc();
     } else {
       log.trace("No envelope is available");
@@ -310,11 +309,11 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
    * will run the task asynchronously. It runs window and commit in the 
provided thread pool.
    */
   private class AsyncTaskWorker implements TaskCallbackListener {
-    private final TaskInstance<AsyncStreamTask> task;
+    private final TaskInstance task;
     private final TaskCallbackManager callbackManager;
     private volatile AsyncTaskState state;
 
-    AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) {
+    AsyncTaskWorker(TaskInstance task) {
       this.task = task;
       this.callbackManager = new TaskCallbackManager(this, callbackTimer, 
callbackTimeoutMs, maxConcurrency, clock);
       Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
@@ -351,12 +350,14 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
      * @param task
      * @return a Set of SSPs such that all SSPs are not at end of stream.
      */
-    private Set<SystemStreamPartition> 
getWorkingSSPSet(TaskInstance<AsyncStreamTask> task) {
+    private Set<SystemStreamPartition> getWorkingSSPSet(TaskInstance task) {
 
       Set<SystemStreamPartition> allPartitions = new 
HashSet<>(JavaConversions.setAsJavaSet(task.systemStreamPartitions()));
 
       // filter only those SSPs that are not at end of stream.
-      Set<SystemStreamPartition> workingSSPSet = 
allPartitions.stream().filter(ssp -> 
!consumerMultiplexer.isEndOfStream(ssp)).collect(Collectors.toSet());
+      Set<SystemStreamPartition> workingSSPSet = allPartitions.stream()
+          .filter(ssp -> !consumerMultiplexer.isEndOfStream(ssp))
+          .collect(Collectors.toSet());
       return workingSSPSet;
     }
 
@@ -511,15 +512,18 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
             state.doneProcess();
             TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
             containerMetrics.processNs().update(clock.nanoTime() - 
callbackImpl.timeCreatedNs);
-            log.trace("Got callback complete for task {}, ssp {}", 
callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
+            log.trace("Got callback complete for task {}, ssp {}",
+                callbackImpl.taskName, 
callbackImpl.envelope.getSystemStreamPartition());
 
             TaskCallbackImpl callbackToUpdate = 
callbackManager.updateCallback(callbackImpl);
             if (callbackToUpdate != null) {
               IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
-              log.trace("Update offset for ssp {}, offset {}", 
envelope.getSystemStreamPartition(), envelope.getOffset());
+              log.trace("Update offset for ssp {}, offset {}",
+                  envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update offset
-              task.offsetManager().update(task.taskName(), 
envelope.getSystemStreamPartition(), envelope.getOffset());
+              task.offsetManager().update(task.taskName(),
+                  envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update coordinator
               coordinatorRequests.update(callbackToUpdate.coordinator);
@@ -695,7 +699,8 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
       PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.remove();
       int queueSize = pendingEnvelopeQueue.size();
       taskMetrics.pendingMessages().set(queueSize);
-      log.trace("fetch envelope ssp {} offset {} to process.", 
pendingEnvelope.envelope.getSystemStreamPartition(), 
pendingEnvelope.envelope.getOffset());
+      log.trace("fetch envelope ssp {} offset {} to process.",
+          pendingEnvelope.envelope.getSystemStreamPartition(), 
pendingEnvelope.envelope.getOffset());
       log.debug("Task {} pending envelopes count is {} after fetching.", 
taskName, queueSize);
 
       if (pendingEnvelope.markProcessed()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 13b72fa..b64e406 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -75,6 +75,9 @@ object JobConfig {
   val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
   val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
 
+  // Processor Config Constants
+  val PROCESSOR_ID = "processor.id"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 7df7d88..b1ab1e0 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -22,7 +22,6 @@ package org.apache.samza.container
 import org.apache.samza.task.CoordinatorRequests
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, 
SystemStreamPartition}
 import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.task.StreamTask
 import org.apache.samza.util.{Logging, Throttleable, ThrottlingExecutor, 
TimerUtils}
 
 import scala.collection.JavaConversions._
@@ -37,7 +36,7 @@ import scala.collection.JavaConversions._
  * be done when.
  */
 class RunLoop (
-  val taskInstances: Map[TaskName, TaskInstance[StreamTask]],
+  val taskInstances: Map[TaskName, TaskInstance],
   val consumerMultiplexer: SystemConsumers,
   val metrics: SamzaContainerMetrics,
   val maxThrottlingDelayMs: Long,
@@ -57,13 +56,16 @@ class RunLoop (
   // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently 
route them.
   val systemStreamPartitionToTaskInstances = 
getSystemStreamPartitionToTaskInstancesMapping
 
-  def getSystemStreamPartitionToTaskInstancesMapping: 
Map[SystemStreamPartition, List[TaskInstance[StreamTask]]] = {
-    // We could just pass in the SystemStreamPartitionMap during construction, 
but it's safer and cleaner to derive the information directly
-    def getSystemStreamPartitionToTaskInstance(taskInstance: 
TaskInstance[StreamTask]) = taskInstance.systemStreamPartitions.map(_ -> 
taskInstance).toMap
+  def getSystemStreamPartitionToTaskInstancesMapping: 
Map[SystemStreamPartition, List[TaskInstance]] = {
+    // We could just pass in the SystemStreamPartitionMap during construction,
+    // but it's safer and cleaner to derive the information directly
+    def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) =
+      taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
 
-    taskInstances.values.map { getSystemStreamPartitionToTaskInstance 
}.flatten.groupBy(_._1).map {
-      case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList
-    }
+    taskInstances.values
+      .flatMap(getSystemStreamPartitionToTaskInstance)
+      .groupBy(_._1)
+      .map { case (ssp, ssp2taskInstance) => ssp -> 
ssp2taskInstance.map(_._2).toList }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e49da57..c3308bf 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -22,9 +22,7 @@ package org.apache.samza.container
 import java.io.File
 import java.nio.file.Path
 import java.util
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, 
TimeUnit}
 import java.lang.Thread.UncaughtExceptionHandler
 import java.net.{URL, UnknownHostException}
 import org.apache.samza.SamzaException
@@ -32,7 +30,7 @@ import org.apache.samza.checkpoint.{CheckpointListener, 
CheckpointManagerFactory
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.{Config, ShellCommandConfig}
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
@@ -69,7 +67,9 @@ import 
org.apache.samza.system.chooser.RoundRobinChooserFactory
 import org.apache.samza.task.AsyncRunLoop
 import org.apache.samza.task.AsyncStreamTask
 import org.apache.samza.task.AsyncStreamTaskAdapter
+import org.apache.samza.task.AsyncStreamTaskFactory
 import org.apache.samza.task.StreamTask
+import org.apache.samza.task.StreamTaskFactory
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.HighResolutionClock
 import org.apache.samza.util.ExponentialSleepStrategy
@@ -112,7 +112,14 @@ object SamzaContainer extends Logging {
 
     try {
       jmxServer = newJmxServer()
-      SamzaContainer(containerModel, jobModel, jmxServer).run
+      val containerModel = jobModel.getContainers.get(containerId.toInt)
+      SamzaContainer(
+        containerId.toInt,
+        containerModel,
+        config,
+        jobModel.maxChangeLogStreamPartitions,
+        getLocalityManager(containerId, config),
+        jmxServer).run
     } finally {
       if (jmxServer != null) {
         jmxServer.stop
@@ -120,6 +127,17 @@ object SamzaContainer extends Logging {
     }
   }
 
+  def getLocalityManager(containerId: Int, config: Config): LocalityManager = {
+    val containerName = getSamzaContainerName(containerId)
+    val registryMap = new MetricsRegistryMap(containerName)
+    val coordinatorSystemProducer =
+      new CoordinatorStreamSystemFactory()
+        .getCoordinatorStreamSystemProducer(
+          config,
+          new SamzaContainerMetrics(containerName, registryMap).registry)
+    new LocalityManager(coordinatorSystemProducer)
+  }
+
   /**
    * Fetches config, task:SSP assignments, and task:changelog partition
    * assignments, and returns objects to be used for SamzaContainer's
@@ -136,10 +154,20 @@ object SamzaContainer extends Logging {
         classOf[JobModel])
   }
 
-  def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: 
JmxServer) = {
-    val config = jobModel.getConfig
-    val containerId = containerModel.getContainerId
-    val containerName = "samza-container-%s" format containerId
+  def getSamzaContainerName(containerId: Int): String = {
+    "samza-container-%d" format containerId
+  }
+
+  def apply(
+    containerId: Int,
+    containerModel: ContainerModel,
+    config: Config,
+    maxChangeLogStreamPartitions: Int,
+    localityManager: LocalityManager,
+    jmxServer: JmxServer,
+    customReporters: Map[String, MetricsReporter] = Map[String, 
MetricsReporter](),
+    taskFactory: Object = null) = {
+    val containerName = getSamzaContainerName(containerId)
     val containerPID = Util.getContainerPID
 
     info("Setting up Samza container: %s" format containerName)
@@ -207,12 +235,6 @@ object SamzaContainer extends Logging {
 
     info("Got input stream metadata: %s" format inputStreamMetadata)
 
-    val taskClassName = config
-      .getTaskClass
-      .getOrElse(throw new SamzaException("No task class defined in 
configuration."))
-
-    info("Got stream task class: %s" format taskClassName)
-
     val consumers = inputSystems
       .map(systemName => {
         val systemFactory = systemFactories(systemName)
@@ -221,7 +243,7 @@ object SamzaContainer extends Logging {
           (systemName, systemFactory.getConsumer(systemName, config, 
samzaContainerMetrics.registry))
         } catch {
           case e: Exception =>
-            error("Failed to create a consumer for %s, so skipping." 
format(systemName), e)
+            error("Failed to create a consumer for %s, so skipping." format 
systemName, e)
             (systemName, null)
         }
       })
@@ -230,11 +252,6 @@ object SamzaContainer extends Logging {
 
     info("Got system consumers: %s" format consumers.keys)
 
-    val isAsyncTask = 
classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName))
-    if (isAsyncTask) {
-      info("%s is AsyncStreamTask" format taskClassName)
-    }
-
     val producers = systemFactories
       .map {
         case (systemName, systemFactory) =>
@@ -242,12 +259,11 @@ object SamzaContainer extends Logging {
             (systemName, systemFactory.getProducer(systemName, config, 
samzaContainerMetrics.registry))
           } catch {
             case e: Exception =>
-              error("Failed to create a producer for %s, so skipping." 
format(systemName), e)
+              error("Failed to create a producer for %s, so skipping." format 
systemName, e)
               (systemName, null)
           }
       }
       .filter(_._2 != null)
-      .toMap
 
     info("Got system producers: %s" format producers.keys)
 
@@ -265,44 +281,48 @@ object SamzaContainer extends Logging {
     info("Got serdes: %s" format serdes.keys)
 
     /*
-     * A Helper function to build a Map[String, Serde] (systemName -> Serde) 
for systems defined in the config. This is useful to build both key and message 
serde maps.
+     * A Helper function to build a Map[String, Serde] (systemName -> Serde) 
for systems defined
+     * in the config. This is useful to build both key and message serde maps.
      */
     val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => {
       systemNames
         .filter(systemName => getSerdeName(systemName).isDefined)
               .map(systemName => {
           val serdeName = getSerdeName(systemName).get
-          val serde = serdes.getOrElse(serdeName, throw new 
SamzaException("buildSystemSerdeMap: No class defined for serde: %s." format 
serdeName))
+          val serde = serdes.getOrElse(serdeName,
+            throw new SamzaException("buildSystemSerdeMap: No class defined 
for serde: %s." format serdeName))
           (systemName, serde)
         }).toMap
     }
 
     /*
-     * A Helper function to build a Map[SystemStream, Serde] for streams 
defined in the config. This is useful to build both key and message serde maps.
+     * A Helper function to build a Map[SystemStream, Serde] for streams 
defined in the config.
+     * This is useful to build both key and message serde maps.
      */
     val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => 
Option[String]) => {
       (serdeStreams ++ inputSystemStreamPartitions)
         .filter(systemStream => getSerdeName(systemStream).isDefined)
         .map(systemStream => {
           val serdeName = getSerdeName(systemStream).get
-          val serde = serdes.getOrElse(serdeName, throw new 
SamzaException("buildSystemStreamSerdeMap: No class defined for serde: %s." 
format serdeName))
+          val serde = serdes.getOrElse(serdeName,
+            throw new SamzaException("buildSystemStreamSerdeMap: No class 
defined for serde: %s." format serdeName))
           (systemStream, serde)
         }).toMap
     }
 
-    val systemKeySerdes = buildSystemSerdeMap((systemName: String) => 
config.getSystemKeySerde(systemName))
+    val systemKeySerdes = buildSystemSerdeMap(systemName => 
config.getSystemKeySerde(systemName))
 
     debug("Got system key serdes: %s" format systemKeySerdes)
 
-    val systemMessageSerdes = buildSystemSerdeMap((systemName: String) => 
config.getSystemMsgSerde(systemName))
+    val systemMessageSerdes = buildSystemSerdeMap(systemName => 
config.getSystemMsgSerde(systemName))
 
     debug("Got system message serdes: %s" format systemMessageSerdes)
 
-    val systemStreamKeySerdes = buildSystemStreamSerdeMap((systemStream: 
SystemStream) => config.getStreamKeySerde(systemStream))
+    val systemStreamKeySerdes = buildSystemStreamSerdeMap(systemStream => 
config.getStreamKeySerde(systemStream))
 
     debug("Got system stream key serdes: %s" format systemStreamKeySerdes)
 
-    val systemStreamMessageSerdes = buildSystemStreamSerdeMap((systemStream: 
SystemStream) => config.getStreamMsgSerde(systemStream))
+    val systemStreamMessageSerdes = buildSystemStreamSerdeMap(systemStream => 
config.getStreamMsgSerde(systemStream))
 
     debug("Got system stream message serdes: %s" format 
systemStreamMessageSerdes)
 
@@ -349,15 +369,10 @@ object SamzaContainer extends Logging {
     }
     info("Got security manager: %s" format securityManager)
 
-    val coordinatorSystemProducer = new 
CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, 
samzaContainerMetrics.registry)
-    val localityManager = new LocalityManager(coordinatorSystemProducer)
-    val checkpointManager = config.getCheckpointManagerFactory() match {
-      case Some(checkpointFactoryClassName) if 
(!checkpointFactoryClassName.isEmpty) =>
-        Util
-          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
-          .getCheckpointManager(config, samzaContainerMetrics.registry)
-      case _ => null
-    }
+    val checkpointManager = config.getCheckpointManagerFactory()
+      .filterNot(_.isEmpty)
+      
.map(Util.getObj[CheckpointManagerFactory](_).getCheckpointManager(config, 
samzaContainerMetrics.registry))
+      .orNull
     info("Got checkpoint manager: %s" format checkpointManager)
 
     // create a map of consumers with callbacks to pass to the OffsetManager
@@ -415,8 +430,26 @@ object SamzaContainer extends Logging {
     val singleThreadMode = config.getSingleThreadMode
     info("Got single thread mode: " + singleThreadMode)
 
+    val taskClassName = config.getTaskClass.orNull
+    info("Got task class name: %s" format taskClassName)
+
+    if (taskClassName == null && taskFactory == null) {
+      throw new SamzaException("Either the task class name or the task factory 
instance is required.")
+    }
+
+    val isAsyncTask: Boolean =
+      if (taskFactory != null) {
+        taskFactory.isInstanceOf[AsyncStreamTaskFactory]
+      } else {
+        classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName))
+      }
+
+    if (isAsyncTask) {
+      info("Got an AsyncStreamTask implementation.")
+    }
+
     if(singleThreadMode && isAsyncTask) {
-      throw new SamzaException("AsyncStreamTask %s cannot run on single thread 
mode." format taskClassName)
+      throw new SamzaException("AsyncStreamTask cannot run on single thread 
mode.")
     }
 
     val threadPoolSize = config.getThreadPoolSize
@@ -443,12 +476,23 @@ object SamzaContainer extends Logging {
     val storeWatchPaths = new util.HashSet[Path]()
     storeWatchPaths.add(defaultStoreBaseDir.toPath)
 
-    val taskInstances: Map[TaskName, TaskInstance[_]] = 
containerModel.getTasks.values.map(taskModel => {
+    val taskInstances: Map[TaskName, TaskInstance] = 
containerModel.getTasks.values.map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)
 
       val taskName = taskModel.getTaskName
 
-      val taskObj = Class.forName(taskClassName).newInstance
+      val taskObj = if (taskFactory != null) {
+        debug("Using task factory to create task instance")
+        taskFactory match {
+          case tf: AsyncStreamTaskFactory => tf.createInstance()
+          case tf: StreamTaskFactory => tf.createInstance()
+          case _ =>
+            throw new SamzaException("taskFactory must be an instance of 
StreamTaskFactory or AsyncStreamTaskFactory")
+        }
+      } else {
+        debug("Using task class name: %s to create instance" format 
taskClassName)
+        Class.forName(taskClassName).newInstance
+      }
 
       val task = if (!singleThreadMode && !isAsyncTask)
         // Wrap the StreamTask into a AsyncStreamTask with the build-in thread 
pool
@@ -464,18 +508,21 @@ object SamzaContainer extends Logging {
         .map {
           case (storeName, changeLogSystemStream) =>
             val systemConsumer = systemFactories
-              .getOrElse(changeLogSystemStream.getSystem, throw new 
SamzaException("Changelog system %s for store %s does not exist in the config." 
format (changeLogSystemStream, storeName)))
+              .getOrElse(changeLogSystemStream.getSystem,
+                throw new SamzaException("Changelog system %s for store %s 
does not " +
+                  "exist in the config." format (changeLogSystemStream, 
storeName)))
               .getConsumer(changeLogSystemStream.getSystem, config, 
taskInstanceMetrics.registry)
             samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName)
             (storeName, systemConsumer)
-        }.toMap
+        }
 
       info("Got store consumers: %s" format storeConsumers)
 
       var loggedStorageBaseDir: File = null
       if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
         val jobNameAndId = Util.getJobNameAndId(config)
-        loggedStorageBaseDir = new 
File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + 
File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
+        loggedStorageBaseDir = new 
File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR)
+          + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
       } else {
         warn("No override was provided for logged store base directory. This 
disables local state re-use on " +
           "application restart. If you want to enable this feature, set 
LOGGED_STORE_BASE_DIR as an environment " +
@@ -496,11 +543,13 @@ object SamzaContainer extends Logging {
               null
             }
             val keySerde = config.getStorageKeySerde(storeName) match {
-              case Some(keySerde) => serdes.getOrElse(keySerde, throw new 
SamzaException("StorageKeySerde: No class defined for serde: %s." format 
keySerde))
+              case Some(keySerde) => serdes.getOrElse(keySerde,
+                throw new SamzaException("StorageKeySerde: No class defined 
for serde: %s." format keySerde))
               case _ => null
             }
             val msgSerde = config.getStorageMsgSerde(storeName) match {
-              case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new 
SamzaException("StorageMsgSerde: No class defined for serde: %s." format 
msgSerde))
+              case Some(msgSerde) => serdes.getOrElse(msgSerde,
+                throw new SamzaException("StorageMsgSerde: No class defined 
for serde: %s." format msgSerde))
               case _ => null
             }
             val storeBaseDir = if(changeLogSystemStreamPartition != null) {
@@ -528,7 +577,7 @@ object SamzaContainer extends Logging {
         taskStores = taskStores,
         storeConsumers = storeConsumers,
         changeLogSystemStreams = changeLogSystemStreams,
-        jobModel.maxChangeLogStreamPartitions,
+        maxChangeLogStreamPartitions,
         streamMetadataCache = streamMetadataCache,
         storeBaseDir = defaultStoreBaseDir,
         loggedStoreBaseDir = loggedStorageBaseDir,
@@ -541,7 +590,7 @@ object SamzaContainer extends Logging {
 
       info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " 
for " + taskName)
 
-      def createTaskInstance[T] (task: T ): TaskInstance[T] = new 
TaskInstance[T](
+      def createTaskInstance(task: Any): TaskInstance = new TaskInstance(
           task = task,
           taskName = taskName,
           config = config,
@@ -632,7 +681,7 @@ object SamzaContainer extends Logging {
 
 class SamzaContainer(
   containerContext: SamzaContainerContext,
-  taskInstances: Map[TaskName, TaskInstance[_]],
+  taskInstances: Map[TaskName, TaskInstance],
   runLoop: Runnable,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
@@ -648,6 +697,17 @@ class SamzaContainer(
   taskThreadPool: ExecutorService = null) extends Runnable with Logging {
 
   val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L)
+  private val runLoopStartLatch: CountDownLatch = new CountDownLatch(1)
+
+  def awaitStart(timeoutMs: Long): Boolean = {
+    try {
+      runLoopStartLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
+    } catch {
+      case ie: InterruptedException =>
+        error("Interrupted while waiting for runloop to start!", ie)
+        throw ie
+    }
+  }
 
   def run {
     try {
@@ -664,8 +724,9 @@ class SamzaContainer(
       startConsumers
       startSecurityManger
 
-      info("Entering run loop.")
       addShutdownHook
+      runLoopStartLatch.countDown()
+      info("Entering run loop.")
       runLoop.run
     } catch {
       case e: Throwable =>
@@ -689,6 +750,13 @@ class SamzaContainer(
     }
   }
 
+  def shutdown() = {
+    runLoop match {
+      case runLoop: RunLoop => runLoop.shutdown
+      case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown()
+    }
+  }
+
   def startDiskSpaceMonitor: Unit = {
     if (diskSpaceMonitor != null) {
       info("Starting disk space monitor")
@@ -746,9 +814,11 @@ class SamzaContainer(
         localityManager.writeContainerToHostMapping(containerContext.id, 
hostInet.getHostName, jmxUrl, jmxTunnelingUrl)
       } catch {
         case uhe: UnknownHostException =>
-          warn("Received UnknownHostException when persisting locality info 
for container %d: %s" format (containerContext.id, uhe.getMessage))  //No-op
+          warn("Received UnknownHostException when persisting locality info 
for container %d: " +
+            "%s" format (containerContext.id, uhe.getMessage))  //No-op
         case unknownException: Throwable =>
-          warn("Received an exception when persisting locality info for 
container %d: %s" format (containerContext.id, unknownException.getMessage))
+          warn("Received an exception when persisting locality info for 
container %d: " +
+            "%s" format (containerContext.id, unknownException.getMessage))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 26a8f5f..e07fcf4 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -43,8 +43,8 @@ import org.apache.samza.util.Logging
 
 import scala.collection.JavaConversions._
 
-class TaskInstance[T](
-  task: T,
+class TaskInstance(
+  task: Any,
   val taskName: TaskName,
   config: Config,
   val metrics: TaskInstanceMetrics,
@@ -84,7 +84,8 @@ class TaskInstance[T](
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
   // means the same ssp in other taskInstances have the same offset as
   // the one here.
-  var ssp2catchedupMapping: 
scala.collection.mutable.Map[SystemStreamPartition, Boolean] = 
scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
+  var ssp2catchedupMapping: 
scala.collection.mutable.Map[SystemStreamPartition, Boolean] =
+    scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
   systemStreamPartitions.foreach(ssp2catchedupMapping += _ -> false)
 
   def registerMetrics {
@@ -140,7 +141,8 @@ class TaskInstance[T](
     })
   }
 
-  def process(envelope: IncomingMessageEnvelope, coordinator: 
ReadableCoordinator, callbackFactory: TaskCallbackFactory = null) {
+  def process(envelope: IncomingMessageEnvelope, coordinator: 
ReadableCoordinator,
+    callbackFactory: TaskCallbackFactory = null) {
     metrics.processes.inc
 
     if (!ssp2catchedupMapping.getOrElse(envelope.getSystemStreamPartition,
@@ -151,7 +153,8 @@ class TaskInstance[T](
     if (ssp2catchedupMapping(envelope.getSystemStreamPartition)) {
       metrics.messagesActuallyProcessed.inc
 
-      trace("Processing incoming message envelope for taskName and SSP: %s, 
%s" format (taskName, envelope.getSystemStreamPartition))
+      trace("Processing incoming message envelope for taskName and SSP: %s, %s"
+        format (taskName, envelope.getSystemStreamPartition))
 
       if (isAsyncTask) {
         exceptionHandler.maybeHandle {
@@ -163,7 +166,8 @@ class TaskInstance[T](
          task.asInstanceOf[StreamTask].process(envelope, collector, 
coordinator)
         }
 
-        trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" 
format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
+        trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
+          format (taskName, envelope.getSystemStreamPartition, 
envelope.getOffset))
 
         offsetManager.update(taskName, envelope.getSystemStreamPartition, 
envelope.getOffset)
       }
@@ -173,7 +177,7 @@ class TaskInstance[T](
   def endOfStream(coordinator: ReadableCoordinator): Unit = {
     if (isEndOfStreamListenerTask) {
       exceptionHandler.maybeHandle {
-        task.asInstanceOf[EndOfStreamListenerTask].onEndOfStream(collector, 
coordinator);
+        task.asInstanceOf[EndOfStreamListenerTask].onEndOfStream(collector, 
coordinator)
       }
     }
   }
@@ -230,7 +234,8 @@ class TaskInstance[T](
 
   override def toString() = "TaskInstance for class %s and taskName %s." 
format (task.getClass.getName, taskName)
 
-  def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, 
closable=%s endofstreamlistener=%s]" format (taskName, isWindowableTask, 
isClosableTask, isEndOfStreamListenerTask)
+  def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, 
closable=%s endofstreamlistener=%s]" format
+    (taskName, isWindowableTask, isClosableTask, isEndOfStreamListenerTask)
 
   /**
    * From the envelope, check if this SSP has catched up with the starting 
offset of the SSP

Reply via email to