SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and 
low-level APIs in YARN and standalone environment

This is the initial PR for SEP-13. High-lighted changes:
- Define StreamApplication and TaskApplication with 
describe(ApplicationDescriptor) API to define processing logic of a Stream 
application
   - the objects instantiated and registered to ApplicationDescriptor in 
describe() method should be serializable
- Define ApplicationRunner to have mandatory constructor parameter of 
ApplicationDescriptor
- Define ProcessorLifecycleListenerFactory to allow user inject local logic and 
instantiate local objects in the processors in an application

Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Author: Yi Pan (Data Infrastructure) <yi...@yipan-ld2.linkedin.biz>
Author: Prateek Maheshwari <pmaheshw...@linkedin.com>
Author: Prateek Maheshwari <prate...@utexas.edu>
Author: prateekm <prate...@utexas.edu>

Reviewers: Prateek Maheshwari <pmaheshw...@apache.org>, Cameron Lee 
<ca...@linkedin.com>

Closes #606 from nickpan47/app-runtime-with-processor-callbacks and squashes 
the following commits:

3e60d44a [Yi Pan (Data Infrastructure)] SAMZA-1789: final revision on 
ApplicationDescriptor and ApplicationRunner APIs
bdb5b0fc [Yi Pan (Data Infrastructure)] SAMZA-1789: ApplicationRunner and 
ApplicationDescriptor final revision
66af5b70 [Yi Pan (Data Infrastructure)] SAMZA-1789: addressing Cameron's review 
comments.
ec4bb1dc [Yi Pan (Data Infrastructure)] SAMZA-1789: merge with fix for 
SAMZA-1836
9c89c63d [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
91fcd73a [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
34ffda8a [Yi Pan (Data Infrastructure)] SAMZA-1789: disabling tests due to 
SAMZA-1836
02076c85 [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed the modifier for the 
mandatory constructor of ApplicationRunner; Disabled three tests due to wrong 
configure for test systems
222abf21 [Yi Pan (Data Infrastructure)] SAMZA-1789: added a constructor to 
StreamProcessor to take a StreamProcessorListenerFactory
7a73992a [Yi Pan (Data Infrastructure)] SAMZA-1789: fixing checkstyle and 
javadoc errors
9997b98b [Yi Pan (Data Infrastructure)] SAMZA-1789: renamed all 
ApplicationDescriptor classes with full-spelling of Application
f4b3d43a [Yi Pan (Data Infrastructure)] SAMZA-1789: Fxing TaskApplication 
examples and some checkstyle errors
f2969f8d [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed ApplicationDescriptor 
to use InputDescriptor and OutputDescriptor; addressed Prateek's comments.
f04404cc [Yi Pan (Data Infrastructure)] SAMZA-1789: move createStreams out of 
the loop in prepareJobs
33753f72 [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
12c09af0 [Yi Pan (Data Infrastructure)] SAMZA-1789: Fix a merging error (with 
SAMZA-1813)
a072118d [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
e7af6932 [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
8d4d3ffd [Yi Pan (Data Infrastructure)] Merge with master
055bd91e [Yi Pan (Data Infrastructure)] SAMZA-1789: fix unit test with 
ThreadJobFactory
247dcff4 [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
1621c4d0 [Yi Pan (Data Infrastructure)] SAMZA-1789: a few more fixes to address 
Cameron's reviews
6e446fe6 [Yi Pan (Data Infrastructure)] SAMZA-1789: address Cameron's review 
comments.
4382d45d [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
3b2f04d5 [Yi Pan (Data Infrastructure)] SAMZA-1789: moved all impl classes from 
samza-api to samza-core.
db96da83 [Yi Pan (Data Infrastructure)] SAMZA-1789: WIP - revision to address 
review feedbacks.
01433717 [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
a82708bb [Yi Pan (Data Infrastructure)] SAMZA-1789: unify ApplicationDescriptor 
and ApplicationRunner for high- and low-level APIs in YARN and standalone 
environment
c4bb0dce [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-runtime-with-processor-callbacks
f20cdcda [Yi Pan (Data Infrastructure)] WIP: adding unit tests. Pending update 
on StreamProcessorLifecycleListener, LocalContainerRunner, and 
SamzaContainerListener
973eb526 [Yi Pan (Data Infrastructure)] WIP: compiles, still working on 
LocalContainerRunner refactor
fb1bc49e [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
app-spec-with-app-runtime-Jul-16-18
30a4e5f0 [Yi Pan (Data Infrastructure)] WIP: application runner refactor - 
proto-type for SEP-13
95577b74 [Yi Pan (Data Infrastructure)] WIP: trying to figure out the two 
interface classes for spec: a) spec builder in init(); b) spec reader in all 
other lifecycle methods
42782d81 [Yi Pan (Data Infrastructure)] Merge branch 
'prateek-remove-app-runner-stream-spec' into app-spec-with-app-runtime-Jul-16-18
d43e9231 [Yi Pan (Data Infrastructure)] WIP: proto-type with 
ApplicationRunnable and no ApplicationRunner exposed to user
f1cb8f0e [Yi Pan (Data Infrastructure)] Merge branch 'master' into 
single-app-api-May-21-18
7e71dc7e [Yi Pan (Data Infrastructure)] Merge with master
85619301 [Prateek Maheshwari] Merge branch 'master' into stream-spec-cleanup
7d7aa508 [Prateek Maheshwari] Updated with Cameron and Daniel's feedback.
8e6fc2da [prateekm] Remove all usages of StreamSpec and ApplicationRunner from 
the operator spec and impl layers.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/abf49eaa
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/abf49eaa
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/abf49eaa

Branch: refs/heads/master
Commit: abf49eaaa59c258255e7436ae323d999e6cba51b
Parents: b0b2922
Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Authored: Thu Sep 6 23:35:59 2018 -0700
Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Committed: Thu Sep 6 23:35:59 2018 -0700

----------------------------------------------------------------------
 .../application/ApplicationDescriptor.java      |  80 +++
 .../samza/application/SamzaApplication.java     |  40 ++
 .../samza/application/StreamApplication.java    |  75 +--
 .../StreamApplicationDescriptor.java            | 113 ++++
 .../samza/application/TaskApplication.java      |  86 +++
 .../application/TaskApplicationDescriptor.java  |  64 ++
 .../java/org/apache/samza/config/Config.java    |   3 +-
 .../samza/metrics/MetricsReporterFactory.java   |   5 +-
 .../apache/samza/operators/MessageStream.java   |   9 +-
 .../org/apache/samza/operators/StreamGraph.java | 120 ----
 .../operators/functions/ClosableFunction.java   |   7 +-
 .../operators/functions/InitableFunction.java   |   6 +-
 .../operators/functions/StreamExpander.java     |  16 +-
 .../apache/samza/runtime/ApplicationRunner.java |  92 +--
 .../samza/runtime/ApplicationRunners.java       |  82 +++
 .../apache/samza/runtime/ProcessorContext.java  |  31 +
 .../runtime/ProcessorLifecycleListener.java     |  55 ++
 .../ProcessorLifecycleListenerFactory.java      |  40 ++
 .../samza/task/AsyncStreamTaskFactory.java      |  10 +-
 .../apache/samza/task/StreamTaskFactory.java    |   6 +-
 .../java/org/apache/samza/task/TaskFactory.java |  38 ++
 .../samza/runtime/TestApplicationRunners.java   |  88 +++
 .../application/ApplicationDescriptorImpl.java  | 179 ++++++
 .../application/ApplicationDescriptorUtil.java  |  51 ++
 .../samza/application/ApplicationUtil.java      |  63 ++
 .../application/LegacyTaskApplication.java      |  37 ++
 .../StreamApplicationDescriptorImpl.java        | 381 ++++++++++++
 .../TaskApplicationDescriptorImpl.java          | 129 ++++
 .../samza/container/SamzaContainerListener.java |  22 +-
 .../samza/execution/ExecutionPlanner.java       |   7 +-
 .../org/apache/samza/execution/JobGraph.java    |   6 -
 .../org/apache/samza/execution/JobPlanner.java  | 188 ++++++
 .../apache/samza/execution/LocalJobPlanner.java | 134 +++++
 .../samza/execution/RemoteJobPlanner.java       |  96 +++
 .../samza/operators/MessageStreamImpl.java      |  57 +-
 .../samza/operators/OperatorSpecGraph.java      |  26 +-
 .../apache/samza/operators/StreamGraphSpec.java | 336 -----------
 .../samza/operators/spec/OperatorSpec.java      |   2 +-
 .../stream/IntermediateMessageStreamImpl.java   |   6 +-
 .../apache/samza/processor/StreamProcessor.java | 122 ++--
 .../StreamProcessorLifecycleListener.java       |  49 --
 .../runtime/AbstractApplicationRunner.java      | 135 -----
 .../samza/runtime/ApplicationRunnerMain.java    |  42 +-
 .../samza/runtime/LocalApplicationRunner.java   | 355 ++++-------
 .../samza/runtime/LocalContainerRunner.java     |  56 +-
 .../samza/runtime/RemoteApplicationRunner.java  | 123 ++--
 .../apache/samza/task/StreamOperatorTask.java   |   5 +-
 .../org/apache/samza/task/TaskFactoryUtil.java  | 137 ++---
 .../apache/samza/container/SamzaContainer.scala |  16 +-
 .../scala/org/apache/samza/job/JobRunner.scala  |   2 -
 .../samza/job/local/ThreadJobFactory.scala      |  48 +-
 .../application/MockStreamApplication.java      |  29 +
 .../samza/application/TestApplicationUtil.java  |  96 +++
 .../TestStreamApplicationDescriptorImpl.java    | 584 +++++++++++++++++++
 .../TestTaskApplicationDescriptorImpl.java      | 144 +++++
 .../samza/execution/TestExecutionPlanner.java   | 192 +++---
 .../execution/TestJobGraphJsonGenerator.java    | 120 ++--
 .../org/apache/samza/execution/TestJobNode.java |  53 +-
 .../samza/execution/TestLocalJobPlanner.java    | 211 +++++++
 .../samza/execution/TestRemoteJobPlanner.java   |  88 +++
 .../samza/operators/TestJoinOperator.java       | 103 ++--
 .../samza/operators/TestMessageStreamImpl.java  |  29 +-
 .../samza/operators/TestOperatorSpecGraph.java  |  19 +-
 .../samza/operators/TestStreamGraphSpec.java    | 506 ----------------
 .../operators/impl/TestOperatorImplGraph.java   | 190 +++---
 .../operators/impl/TestWindowOperator.java      | 147 ++---
 .../spec/TestPartitionByOperatorSpec.java       |  70 ++-
 .../samza/processor/TestStreamProcessor.java    | 139 +++--
 .../runtime/TestApplicationRunnerMain.java      |  47 +-
 .../runtime/TestLocalApplicationRunner.java     | 311 +++-------
 .../runtime/TestRemoteApplicationRunner.java    |  35 +-
 .../apache/samza/task/MockAsyncStreamTask.java  |  31 +
 .../org/apache/samza/task/MockStreamTask.java   |  31 +
 .../apache/samza/task/TestTaskFactoryUtil.java  | 215 ++-----
 .../samza/testUtils/TestAsyncStreamTask.java    |  35 --
 .../samza/testUtils/TestStreamApplication.java  |  33 --
 .../apache/samza/testUtils/TestStreamTask.java  |  34 --
 .../samza/container/TestSamzaContainer.scala    |  76 ++-
 .../samza/sql/runner/SamzaSqlApplication.java   |  13 +-
 .../sql/runner/SamzaSqlApplicationRunner.java   |  53 +-
 .../samza/sql/translator/JoinTranslator.java    |   2 +-
 .../samza/sql/translator/QueryTranslator.java   |  27 +-
 .../samza/sql/translator/ScanTranslator.java    |   8 +-
 .../samza/sql/translator/TranslatorContext.java |  19 +-
 .../apache/samza/sql/e2e/TestSamzaSqlTable.java |   8 +-
 .../runner/TestSamzaSqlApplicationRunner.java   |   2 -
 .../sql/translator/TestFilterTranslator.java    |   6 +-
 .../sql/translator/TestJoinTranslator.java      |  16 +-
 .../sql/translator/TestProjectTranslator.java   |  14 +-
 .../sql/translator/TestQueryTranslator.java     | 162 +++--
 .../example/AppWithGlobalConfigExample.java     |  25 +-
 .../apache/samza/example/BroadcastExample.java  |  22 +-
 .../samza/example/KeyValueStoreExample.java     |  19 +-
 .../org/apache/samza/example/MergeExample.java  |  18 +-
 .../samza/example/OrderShipmentJoinExample.java |  19 +-
 .../samza/example/PageViewCounterExample.java   |  15 +-
 .../samza/example/RepartitionExample.java       |  19 +-
 .../samza/example/TaskApplicationExample.java   |  77 +++
 .../org/apache/samza/example/WindowExample.java |  18 +-
 .../samza/system/mock/MockSystemConsumer.java   |   4 +-
 .../apache/samza/test/framework/TestRunner.java |  41 +-
 .../integration/LocalApplicationRunnerMain.java |  21 +-
 .../TestStandaloneIntegrationApplication.java   |   9 +-
 .../processor/TestZkStreamProcessorBase.java    |  20 +-
 .../EndOfStreamIntegrationTest.java             |  37 +-
 .../WatermarkIntegrationTest.java               |  62 +-
 .../test/framework/BroadcastAssertApp.java      |   7 +-
 .../StreamApplicationIntegrationTest.java       |   9 +-
 ...StreamApplicationIntegrationTestHarness.java |  42 +-
 .../samza/test/framework/TestTimerApp.java      |   7 +-
 .../apache/samza/test/framework/TimerTest.java  |  18 +-
 .../test/operator/RepartitionJoinWindowApp.java |  25 +-
 .../test/operator/RepartitionWindowApp.java     |  20 +-
 .../samza/test/operator/SessionWindowApp.java   |  17 +-
 .../operator/TestRepartitionJoinWindowApp.java  |  30 +-
 .../test/operator/TestRepartitionWindowApp.java |  10 +-
 .../samza/test/operator/TumblingWindowApp.java  |  16 +-
 .../test/processor/TestStreamApplication.java   |  82 +--
 .../test/processor/TestStreamProcessor.java     |  18 +-
 .../processor/TestZkLocalApplicationRunner.java | 317 +++++-----
 .../apache/samza/test/table/TestLocalTable.java |  39 +-
 .../table/TestLocalTableWithSideInputs.java     |  13 +-
 .../samza/test/table/TestRemoteTable.java       |  27 +-
 .../benchmark/SystemConsumerWithSamzaBench.java |  14 +-
 124 files changed, 5280 insertions(+), 3631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
new file mode 100644
index 0000000..178fdee
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsReporterFactory;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+
+
+/**
+ * The interface class to describe the configuration, input and output 
streams, and processing logic in a {@link SamzaApplication}.
+ * <p>
+ * Sub-classes {@link StreamApplicationDescriptor} and {@link 
TaskApplicationDescriptor} are specific interfaces for applications
+ * written in high-level {@link StreamApplication} and low-level {@link 
TaskApplication} APIs, respectively.
+ *
+ * @param <S> sub-class of user application descriptor.
+ */
+@InterfaceStability.Evolving
+public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
+
+  /**
+   * Get the {@link Config} of the application
+   * @return config of the application
+   */
+  Config getConfig();
+
+  /**
+   * Sets the {@link ContextManager} for this application.
+   * <p>
+   * Setting the {@link ContextManager} is optional. The provided {@link 
ContextManager} can be used to build the shared
+   * context between the operator functions within a task instance
+   *
+   * TODO: this should be replaced by the shared context factory when 
SAMZA-1714 is fixed.
+
+   * @param contextManager the {@link ContextManager} to use for the 
application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
contextManager} set as its {@link ContextManager}
+   */
+  S withContextManager(ContextManager contextManager);
+
+  /**
+   * Sets the {@link ProcessorLifecycleListenerFactory} for this application.
+   *
+   * <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a 
user application. It allows users to
+   * plug in optional code to be invoked in different stages before/after the 
main processing logic is started/stopped in
+   * the application.
+   *
+   * @param listenerFactory the user implemented {@link 
ProcessorLifecycleListenerFactory} that creates lifecycle listener
+   *                        with callback methods before and after the 
start/stop of each StreamProcessor in the application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
+   */
+  S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory 
listenerFactory);
+
+  /**
+   * Sets a set of customized {@link MetricsReporterFactory}s in the 
application
+   *
+   * @param reporterFactories the map of customized {@link 
MetricsReporterFactory}s to be used
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
reporterFactories}
+   */
+  S withMetricsReporterFactories(Map<String, MetricsReporterFactory> 
reporterFactories);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java 
b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
new file mode 100644
index 0000000..7606be8
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * The base interface for all user-implemented applications in Samza.
+ * <p>
+ * The main processing logic of the user application should be implemented in 
{@link SamzaApplication#describe(ApplicationDescriptor)}
+ * method. Sub-classes {@link StreamApplication} and {@link TaskApplication} 
are specific interfaces for applications
+ * written in high-level DAG and low-level task APIs, respectively.
+ */
+@InterfaceStability.Evolving
+public interface SamzaApplication<S extends ApplicationDescriptor> {
+
+  /**
+   * Describes the user processing logic via {@link ApplicationDescriptor}
+   *
+   * @param appDesc the {@link ApplicationDescriptor} object to describe user 
application logic
+   */
+  void describe(S appDesc);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java 
b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
index 0b2142b..a83cb37 100644
--- 
a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
+++ 
b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
@@ -19,30 +19,27 @@
 package org.apache.samza.application;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.InitableFunction;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 
 /**
- * Describes and initializes the transforms for processing message streams and 
generating results.
+ * Describes and initializes the transforms for processing message streams and 
generating results in high-level API. 
  * <p>
  * The following example removes page views older than 1 hour from the input 
stream:
  * <pre>{@code
- * public class PageViewCounter implements StreamApplication {
- *   public void init(StreamGraph graph, Config config) {
- *     MessageStream<PageViewEvent> pageViewEvents =
- *       graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m);
- *     OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents 
=
- *       graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> 
m);
+ * public class PageViewFilter implements StreamApplication {
+ *   public void describe(StreamAppDescriptor appDesc) {
+ *     KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
+ *     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
+ *         trackingSystem.getInputDescriptor("pageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class));
+ *
+ *     KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
+ *         trackingSystem.getOutputDescriptor("recentPageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class)));
+ *
+ *     MessageStream<PageViewEvent> pageViewEvents = 
appDesc.getInputStream(inputStreamDescriptor);
+ *     OutputStream<PageViewEvent> recentPageViewEvents = 
appDesc.getOutputStream(outputStreamDescriptor);
  *
  *     pageViewEvents
  *       .filter(m -> m.getCreationTime() > System.currentTimeMillis() - 
Duration.ofHours(1).toMillis())
- *       .sendTo(filteredPageViewEvents);
+ *       .sendTo(recentPageViewEvents);
  *   }
  * }
  * }</pre>
@@ -52,46 +49,28 @@ import org.apache.samza.task.TaskContext;
  *   public static void main(String[] args) {
  *     CommandLine cmdLine = new CommandLine();
  *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- *     PageViewCounter app = new PageViewCounter();
- *     LocalApplicationRunner runner = new LocalApplicationRunner(config);
- *     runner.run(app);
+ *     PageViewFilter app = new PageViewFilter();
+ *     ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, 
config);
+ *     runner.run();
  *     runner.waitForFinish();
  *   }
  * }</pre>
  *
  * <p>
- * Implementation Notes: Currently StreamApplications are wrapped in a {@link 
StreamTask} during execution.
- * A new StreamApplication instance will be created and initialized with a 
user-defined {@link StreamGraph}
- * when planning the execution. The {@link StreamGraph} and the functions 
implemented for transforms are required to
- * be serializable. The execution planner will generate a serialized DAG which 
will be deserialized in each {@link StreamTask}
- * instance used for processing incoming messages. Execution is synchronous 
and thread-safe within each {@link StreamTask}.
+ * Implementation Notes: Currently {@link StreamApplication}s are wrapped in a 
{@link org.apache.samza.task.StreamTask}
+ * during execution. The execution planner will generate a serialized DAG 
which will be deserialized in each
+ * {@link org.apache.samza.task.StreamTask} instance used for processing 
incoming messages. Execution is synchronous
+ * and thread-safe within each {@link org.apache.samza.task.StreamTask}.
  *
  * <p>
+ * A {@link StreamApplication} implementation must have a proper 
fully-qualified class name and a default constructor
+ * with no parameters to ensure successful instantiation in both local and 
remote environments.
  * Functions implemented for transforms in StreamApplications ({@link 
org.apache.samza.operators.functions.MapFunction},
  * {@link org.apache.samza.operators.functions.FilterFunction} for e.g.) are 
initable and closable. They are initialized
- * before messages are delivered to them and closed after their execution when 
the {@link StreamTask} instance is closed.
- * See {@link InitableFunction} and {@link 
org.apache.samza.operators.functions.ClosableFunction}.
+ * before messages are delivered to them and closed after their execution when 
the {@link org.apache.samza.task.StreamTask}
+ * instance is closed. See {@link 
org.apache.samza.operators.functions.InitableFunction} and {@link 
org.apache.samza.operators.functions.ClosableFunction}.
+ * Function implementations are required to be {@link java.io.Serializable}.
  */
-@InterfaceStability.Unstable
-public interface StreamApplication {
-
-  /**
-   * Describes and initializes the transforms for processing message streams 
and generating results.
-   * <p>
-   * The {@link StreamGraph} provides access to input and output streams. 
Input {@link MessageStream}s can be
-   * transformed into other {@link MessageStream}s or sent to an {@link 
OutputStream} using the {@link MessageStream}
-   * operators.
-   * <p>
-   * Most operators accept custom functions for doing the transformations. 
These functions are {@link InitableFunction}s
-   * and are provided the {@link Config} and {@link TaskContext} during their 
own initialization. The config and the
-   * context can be used, for example, to create custom metrics or access 
durable state stores.
-   * <p>
-   * A shared context between {@link InitableFunction}s for different 
operators within a task instance can be set
-   * up by providing a {@link ContextManager} using {@link 
StreamGraph#withContextManager}.
-   *
-   * @param graph the {@link StreamGraph} to get input/output streams from
-   * @param config the configuration for the application
-   */
-  void init(StreamGraph graph, Config config);
-
+@InterfaceStability.Evolving
+public interface StreamApplication extends 
SamzaApplication<StreamApplicationDescriptor> {
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
new file mode 100644
index 0000000..5a3de49
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.table.Table;
+
+
+/**
+ * The interface class to describe a {@link SamzaApplication} in high-level 
API in Samza.
+ */
+@InterfaceStability.Evolving
+public interface StreamApplicationDescriptor extends 
ApplicationDescriptor<StreamApplicationDescriptor> {
+
+  /**
+   * Sets the default SystemDescriptor to use for intermediate streams. This 
is equivalent to setting
+   * {@code job.default.system} and its properties in configuration.
+   * <p>
+   * If the default system descriptor is set, it must be set <b>before</b> 
creating any input/output/intermediate streams.
+   * <p>
+   * If an input/output stream is created with a stream-level Serde, they will 
be used, else the serde specified
+   * for the {@code job.default.system} in configuration will be used.
+   * <p>
+   * Providing an incompatible message type for the intermediate streams that 
use the default serde will result in
+   * {@link ClassCastException}s at runtime.
+   *
+   * @param defaultSystemDescriptor the default system descriptor to use
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
defaultSystemDescriptor} set as its default system
+   */
+  StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> 
defaultSystemDescriptor);
+
+  /**
+   * Gets the input {@link MessageStream} corresponding to the {@code 
inputDescriptor}.
+   * <p>
+   * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
+   * can receive messages of type {@code KV<K, V>}. An input {@code 
MessageStream<M>}, obtained using a descriptor with
+   * any other {@code Serde<M>}, can receive messages of type M - the key in 
the incoming message is ignored.
+   * <p>
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
+   * {@code SystemConsumer} deserializes the incoming messages itself, and no 
further deserialization is required from
+   * the framework.
+   * <p>
+   * Multiple invocations of this method with the same {@code inputDescriptor} 
will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param inputDescriptor the descriptor for the stream
+   * @param <M> the type of messages in the input {@link MessageStream}
+   * @return the input {@link MessageStream}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@code inputDescriptor}
+   */
+  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
+
+  /**
+   * Gets the {@link OutputStream} corresponding to the {@code 
outputDescriptor}.
+   * <p>
+   * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
+   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, 
obtained using a descriptor with any
+   * other {@code Serde<M>}, can send messages of type M without a key.
+   * <p>
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
+   * {@code SystemProducer} serializes the outgoing messages itself, and no 
prior serialization is required from
+   * the framework.
+   * <p>
+   * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are 
partitioned using their serialized key.
+   * When sending messages to any other {@code OutputStream<M>}, messages are 
partitioned using a null partition key.
+   * <p>
+   * Multiple invocations of this method with the same {@code 
outputDescriptor} will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param outputDescriptor the descriptor for the stream
+   * @param <M> the type of messages in the {@link OutputStream}
+   * @return the {@link OutputStream}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@code outputDescriptor}
+   */
+  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
+
+  /**
+   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
+   * <p>
+   * Multiple invocations of this method with the same {@link TableDescriptor} 
will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param tableDescriptor the {@link TableDescriptor}
+   * @param <K> the type of the key
+   * @param <V> the type of the value
+   * @return the {@link Table} corresponding to the {@code tableDescriptor}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@link TableDescriptor}
+   */
+  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java 
b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
new file mode 100644
index 0000000..424634d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Describes and initializes the transforms for processing message streams and 
generating results in low-level API. Your
+ * application is expected to implement this interface.
+ * <p>
+ * The following example removes page views older than 1 hour from the input 
stream:
+ * <pre>{@code
+ * public class PageViewFilter implements TaskApplication {
+ *   public void describe(TaskAppDescriptor appDesc) {
+ *     KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor(PageViewTask.SYSTEM);
+ *     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
+ *         trackingSystem.getInputDescriptor(PageViewTask.TASK_INPUT, new 
JsonSerdeV2<>(PageViewEvent.class));
+ *
+ *     KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
+ *         trackingSystem.getOutputDescriptor(PageViewTask.TASK_OUTPUT, new 
JsonSerdeV2<>(PageViewEvent.class)));
+ *
+ *     appDesc.addInputStream(inputStreamDescriptor);
+ *     appDesc.addOutputStream(outputStreamDescriptor);
+ *     appDesc.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
+ *   }
+ * }
+ *
+ * public class PageViewTask implements StreamTask {
+ *   final static String TASK_INPUT = "pageViewEvents";
+ *   final static String TASK_OUTPUT = "recentPageViewEvents";
+ *   final static String SYSTEM = "kafka";
+ *
+ *   public void process(IncomingMessageEnvelope message, MessageCollector 
collector,
+ *       TaskCoordinator coordinator) {
+ *     PageViewEvent m = (PageViewEvent) message.getValue();
+ *     if (m.getCreationTime() > System.currentTimeMillis() - 
Duration.ofHours(1).toMillis()) {
+ *       collector.send(new OutgoingMessageEnvelope(new SystemStream(SYSTEM, 
TASK_OUTPUT),
+ *           message.getKey(), message.getKey(), m));
+ *     }
+ *   }
+ * }
+ * }</pre>
+ *
+ *<p>
+ * The example above can be run using an ApplicationRunner:
+ * <pre>{@code
+ *   public static void main(String[] args) {
+ *     CommandLine cmdLine = new CommandLine();
+ *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ *     PageViewFilter app = new PageViewFilter();
+ *     ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, 
config);
+ *     runner.run();
+ *     runner.waitForFinish();
+ *   }
+ * }</pre>
+ *
+ * <p>
+ * Implementation Notes: {@link TaskApplication} allow users to instantiate 
{@link org.apache.samza.task.StreamTask} or
+ * {@link org.apache.samza.task.AsyncStreamTask} when describing the 
processing logic. A new {@link TaskApplicationDescriptor }
+ * instance will be created and described by the user-defined {@link 
TaskApplication} when planning the execution.
+ * {@link org.apache.samza.task.TaskFactory} is required to be serializable.
+ *
+ * <p>
+ * The user-implemented {@link TaskApplication} class must be a class with 
proper fully-qualified class name and
+ * a default constructor with no parameters to ensure successful instantiation 
in both local and remote environments.
+ */
+@InterfaceStability.Evolving
+public interface TaskApplication extends 
SamzaApplication<TaskApplicationDescriptor> {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
new file mode 100644
index 0000000..0226bb5
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.task.TaskFactory;
+
+
+/**
+ *  The interface to describe a {@link SamzaApplication} that uses low-level 
API task for processing.
+ */
+@InterfaceStability.Evolving
+public interface TaskApplicationDescriptor extends 
ApplicationDescriptor<TaskApplicationDescriptor> {
+
+  /**
+   * Sets the {@link TaskFactory} for the user application. The {@link 
TaskFactory#createInstance()} creates task instance
+   * that implements the main processing logic of the user application.
+   *
+   * @param factory the {@link TaskFactory} including the low-level task 
processing logic. The only allowed task factory
+   *                classes are {@link 
org.apache.samza.task.StreamTaskFactory} and {@link 
org.apache.samza.task.AsyncStreamTaskFactory}.
+   */
+  void setTaskFactory(TaskFactory factory);
+
+  /**
+   * Adds the input stream to the application.
+   *
+   * @param isd the {@link InputDescriptor}
+   */
+  void addInputStream(InputDescriptor isd);
+
+  /**
+   * Adds the output stream to the application.
+   *
+   * @param osd the {@link OutputDescriptor} of the output stream
+   */
+  void addOutputStream(OutputDescriptor osd);
+
+  /**
+   * Adds the {@link TableDescriptor} used in the application
+   *
+   * @param table {@link TableDescriptor}
+   */
+  void addTable(TableDescriptor table);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/config/Config.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java 
b/samza-api/src/main/java/org/apache/samza/config/Config.java
index d4164c6..68f085c 100644
--- a/samza-api/src/main/java/org/apache/samza/config/Config.java
+++ b/samza-api/src/main/java/org/apache/samza/config/Config.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.config;
 
+import java.io.Serializable;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
@@ -32,7 +33,7 @@ import java.util.regex.Pattern;
 /**
  * Store and retrieve named, typed values as configuration for classes 
implementing this interface.
  */
-public abstract class Config implements Map<String, String> {
+public abstract class Config implements Map<String, String>, Serializable {
   public static final String SENSITIVE_PREFIX = "sensitive.";
   public static final String SENSITIVE_MASK = "********";
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java 
b/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
index 7807222..b9934e5 100644
--- 
a/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
+++ 
b/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
@@ -19,11 +19,12 @@
 
 package org.apache.samza.metrics;
 
+import java.io.Serializable;
 import org.apache.samza.config.Config;
 
 /**
  * Build a {@link org.apache.samza.metrics.MetricsReporter}
  */
-public interface MetricsReporterFactory {
-  MetricsReporter getMetricsReporter(String name, String containerName, Config 
config);
+public interface MetricsReporterFactory extends Serializable {
+  MetricsReporter getMetricsReporter(String name, String processorId, Config 
config);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index e3a61c4..a7935d3 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -39,11 +39,12 @@ import org.apache.samza.table.Table;
 /**
  * A stream of messages that can be transformed into another {@link 
MessageStream}.
  * <p>
- * A {@link MessageStream} corresponding to an input stream can be obtained 
using {@link StreamGraph#getInputStream}.
+ * A {@link MessageStream} corresponding to an input stream can be obtained 
using
+ * {@link 
org.apache.samza.application.StreamApplicationDescriptor#getInputStream}.
  *
  * @param <M> the type of messages in this stream
  */
-@InterfaceStability.Unstable
+@InterfaceStability.Evolving
 public interface MessageStream<M> {
 
   /**
@@ -213,8 +214,8 @@ public interface MessageStream<M> {
 
   /**
    * Re-partitions this {@link MessageStream} using keys from the {@code 
keyExtractor} by creating a new
-   * intermediate stream on the default system provided via {@link 
StreamGraph#setDefaultSystem}. This intermediate
-   * stream is both an output and input to the job.
+   * intermediate stream on the default system provided via {@link 
org.apache.samza.application.StreamApplicationDescriptor#withDefaultSystem}.
+   * This intermediate stream is both an output and input to the job.
    * <p>
    * Uses the provided {@link KVSerde} for serialization of keys and values. 
If the provided {@code serde} is null,
    * uses the key and message serde configured for the job's default system.

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java 
b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
deleted file mode 100644
index ec6e4b7..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.table.Table;
-
-
-/**
- * Provides access to {@link MessageStream}s and {@link OutputStream}s used 
to describe application logic.
- */
-@InterfaceStability.Unstable
-public interface StreamGraph {
-
-  /**
-   * Sets the default SystemDescriptor to use for intermediate streams. This 
is equivalent to setting
-   * {@code job.default.system} and its properties in configuration.
-   * <p>
-   * If the default system descriptor is set, it must be set <b>before</b> 
creating any intermediate streams.
-   * <p>
-   * If the intermediate stream is created with a stream-level Serde, they 
will be used, else the serde specified
-   * for the {@code job.default.system} in configuration will be used.
-   * <p>
-   * Providing an incompatible message type for the intermediate streams that 
use the default serde will result in
-   * {@link ClassCastException}s at runtime.
-   *
-   * @param defaultSystemDescriptor the default system descriptor to use
-   */
-  void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
-
-  /**
-   * Gets the input {@link MessageStream} corresponding to the {@code 
inputDescriptor}.
-   * <p>
-   * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
-   * can receive messages of type {@code KV<K, V>}. An input {@code 
MessageStream<M>}, obtained using a descriptor with
-   * any other {@code Serde<M>}, can receive messages of type M - the key in 
the incoming message is ignored.
-   * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
-   * {@code SystemConsumer} deserializes the incoming messages itself, and no 
further deserialization is required from
-   * the framework.
-   * <p>
-   * Multiple invocations of this method with the same {@code inputDescriptor} 
will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param inputDescriptor the descriptor for the stream
-   * @param <M> the type of messages in the input {@link MessageStream}
-   * @return the input {@link MessageStream}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@code inputDescriptor}
-   */
-  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
-
-  /**
-   * Gets the {@link OutputStream} corresponding to the {@code 
outputDescriptor}.
-   * <p>
-   * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
-   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, 
obtained using a descriptor with any
-   * other {@code Serde<M>}, can send messages of type M without a key.
-   * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
-   * {@code SystemProducer} serializes the outgoing messages itself, and no 
prior serialization is required from
-   * the framework.
-   * <p>
-   * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are 
partitioned using their serialized key.
-   * When sending messages to any other {@code OutputStream<M>}, messages are 
partitioned using a null partition key.
-   * <p>
-   * Multiple invocations of this method with the same {@code 
outputDescriptor} will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param outputDescriptor the descriptor for the stream
-   * @param <M> the type of messages in the {@link OutputStream}
-   * @return the {@link OutputStream}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@code outputDescriptor}
-   */
-  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
-
-  /**
-   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
-   * <p>
-   * Multiple invocations of this method with the same {@link TableDescriptor} 
will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param tableDescriptor the {@link TableDescriptor}
-   * @param <K> the type of the key
-   * @param <V> the type of the value
-   * @return the {@link Table} corresponding to the {@code tableDescriptor}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@link TableDescriptor}
-   */
-  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
-
-  /**
-   * Sets the {@link ContextManager} for this {@link StreamGraph}.
-   * <p>
-   * The provided {@link ContextManager} can be used to setup shared context 
between the operator functions
-   * within a task instance
-   *
-   * @param contextManager the {@link ContextManager} to use for the {@link 
StreamGraph}
-   * @return the {@link StreamGraph} with {@code contextManager} set as its 
{@link ContextManager}
-   */
-  StreamGraph withContextManager(ContextManager contextManager);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
index faf9fc5..12823cc 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
@@ -21,17 +21,18 @@ package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
 
+
 /**
  * A function that can be closed after its execution.
  *
  * <p> Implement {@link #close()} to free resources used during the execution 
of the function, clean up state etc.
  *
  * <p> Order of finalization: {@link ClosableFunction}s are closed in the 
reverse topological order of operators in the
- * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and 
B in the graph, if operator B consumes results
- * from operator A, then operator B is guaranteed to be closed before operator 
A.
+ * {@link org.apache.samza.application.StreamApplicationDescriptor}. For any 
two operators A and B in the graph, if operator B
+ * consumes results from operator A, then operator B is guaranteed to be 
closed before operator A.
  *
  */
-@InterfaceStability.Unstable
+@InterfaceStability.Evolving
 public interface ClosableFunction {
   /**
    * Frees any resource acquired by the operators in {@link InitableFunction}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
index 6651819..8a5d83b 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
@@ -27,11 +27,11 @@ import org.apache.samza.task.TaskContext;
  * A function that can be initialized before execution.
  *
  * <p> Order of initialization: {@link InitableFunction}s are invoked in the 
topological order of operators in the
- * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and 
B in the graph, if operator B consumes results
- * from operator A, then operator A is guaranteed to be initialized before 
operator B.
+ * {@link org.apache.samza.application.StreamApplicationDescriptor}. For any 
two operators A and B in the graph, if operator B
+ * consumes results from operator A, then operator A is guaranteed to be 
initialized before operator B.
  *
  */
-@InterfaceStability.Unstable
+@InterfaceStability.Evolving
 public interface InitableFunction {
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
index 085a98d..7bbf601 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
@@ -19,13 +19,13 @@
 package org.apache.samza.operators.functions;
 
 import java.io.Serializable;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
 
 /**
- * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more 
operators on the {@link StreamGraph},
- * and returns a new {@link MessageStream} with the combined results. Called 
when {@link StreamGraph#getInputStream}
+ * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more 
operators on the {@link StreamApplicationDescriptor},
+ * and returns a new {@link MessageStream} with the combined results. Called 
when {@link StreamApplicationDescriptor#getInputStream}
  * is being used to get a {@link MessageStream} using an {@link 
InputDescriptor} from an expanding system descriptor.
  * <p>
  * This is provided by default by expanding system descriptor implementations 
and can not be overridden
@@ -36,23 +36,23 @@ import 
org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
 public interface StreamExpander<OM> extends Serializable {
 
   /**
-   * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more 
operators on the {@link StreamGraph},
+   * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more 
operators on the {@link StreamApplicationDescriptor},
    * and returns a new {@link MessageStream} with the combined results. Called 
when the {@link InputDescriptor}
-   * is being used to get an {@link MessageStream} using {@link 
StreamGraph#getInputStream}.
+   * is being used to get an {@link MessageStream} using {@link 
StreamApplicationDescriptor#getInputStream}.
    * <p>
    * Notes for system implementers:
    * <p>
    * Take care to avoid infinite recursion in the implementation; e.g., by 
ensuring that it doesn't call
-   * {@link StreamGraph#getInputStream} with an {@link InputDescriptor} from 
an expanding system descriptor
+   * {@link StreamApplicationDescriptor#getInputStream} with an {@link 
InputDescriptor} from an expanding system descriptor
    * (like this one) again.
    * <p>
    * It's the {@link StreamExpander}'s responsibility to propagate any 
properties, including serde, from the
    * user-provided {@link InputDescriptor} to the expanded input descriptors.
    *
-   * @param streamGraph the {@link StreamGraph} to register the expanded 
sub-DAG of operators on
+   * @param streamAppDesc the {@link StreamApplicationDescriptor} to register 
the expanded sub-DAG of operators on
    * @param inputDescriptor the {@link InputDescriptor} to be expanded
    * @return the {@link MessageStream} containing the combined results of the 
sub-DAG of operators
    */
-  MessageStream<OM> apply(StreamGraph streamGraph, InputDescriptor 
inputDescriptor);
+  MessageStream<OM> apply(StreamApplicationDescriptor streamAppDesc, 
InputDescriptor inputDescriptor);
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java 
b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index 45abb5d..59543c0 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -20,98 +20,43 @@ package org.apache.samza.runtime;
 
 import java.time.Duration;
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
 import org.apache.samza.job.ApplicationStatus;
 
-import java.lang.reflect.Constructor;
-
 
 /**
- * The primary means of managing execution of the {@link 
org.apache.samza.application.StreamApplication} at runtime.
+ * The primary means of managing execution of the {@link 
org.apache.samza.application.SamzaApplication} at runtime.
+ *
+ * <p>
+ * Implementation Notes: implementation of {@link ApplicationRunner} must have 
a public default constructor
+ * #ApplicationRunner(SamzaApplication, Config)
  */
-@InterfaceStability.Unstable
-public abstract class ApplicationRunner {
-
-  private static final String RUNNER_CONFIG = "app.runner.class";
-  private static final String DEFAULT_RUNNER_CLASS = 
"org.apache.samza.runtime.RemoteApplicationRunner";
-
-  protected final Config config;
-
-  /**
-   * Static method to load the {@link ApplicationRunner}
-   *
-   * @param config  configuration passed in to initialize the Samza processes
-   * @return  the configure-driven {@link ApplicationRunner} to run the 
user-defined stream applications
-   */
-  public static ApplicationRunner fromConfig(Config config) {
-    try {
-      Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, 
DEFAULT_RUNNER_CLASS));
-      if (ApplicationRunner.class.isAssignableFrom(runnerClass)) {
-        Constructor<?> constructor = runnerClass.getConstructor(Config.class); 
// *sigh*
-        return (ApplicationRunner) constructor.newInstance(config);
-      }
-    } catch (Exception e) {
-      throw new ConfigException(String.format("Problem in loading 
ApplicationRunner class %s", config.get(
-          RUNNER_CONFIG)), e);
-    }
-    throw new ConfigException(String.format(
-        "Class %s does not extend ApplicationRunner properly",
-        config.get(RUNNER_CONFIG)));
-  }
-
-
-  public ApplicationRunner(Config config) {
-    if (config == null) {
-      throw new NullPointerException("Parameter 'config' cannot be null.");
-    }
-
-    this.config = config;
-  }
-
-  /**
-   * Deploy and run the Samza jobs to execute {@link 
org.apache.samza.task.StreamTask}.
-   * It is non-blocking so it doesn't wait for the application running.
-   * This method assumes you task.class is specified in the configs.
-   *
-   * NOTE. this interface will most likely change in the future.
-   */
-  @InterfaceStability.Evolving
-  public abstract void runTask();
-
+@InterfaceStability.Evolving
+public interface ApplicationRunner {
 
   /**
-   * Deploy and run the Samza jobs to execute {@link StreamApplication}.
+   * Deploy and run the Samza jobs to execute {@link 
org.apache.samza.application.SamzaApplication}.
    * It is non-blocking so it doesn't wait for the application running.
-   *
-   * @param streamApp  the user-defined {@link StreamApplication} object
    */
-  public abstract void run(StreamApplication streamApp);
+  void run();
 
   /**
-   * Kill the Samza jobs represented by {@link StreamApplication}
+   * Kill the Samza jobs represented by {@link 
org.apache.samza.application.SamzaApplication}
    * It is non-blocking so it doesn't wait for the application stopping.
-   *
-   * @param streamApp  the user-defined {@link StreamApplication} object
    */
-  public abstract void kill(StreamApplication streamApp);
+  void kill();
 
   /**
-   * Get the collective status of the Samza jobs represented by {@link 
StreamApplication}.
-   * Returns {@link ApplicationRunner} running if all jobs are running.
+   * Get the collective status of the Samza jobs represented by {@link 
org.apache.samza.application.SamzaApplication}.
+   * Returns {@link ApplicationStatus} object.
    *
-   * @param streamApp  the user-defined {@link StreamApplication} object
-   * @return the status of the application
+   * @return the current status of an instance of {@link 
org.apache.samza.application.SamzaApplication}
    */
-  public abstract ApplicationStatus status(StreamApplication streamApp);
+  ApplicationStatus status();
 
   /**
    * Waits until the application finishes.
    */
-  public void waitForFinish() {
-    throw new UnsupportedOperationException(getClass().getName() + " does not 
support waitForFinish.");
-  }
+  void waitForFinish();
 
   /**
    * Waits for {@code timeout} duration for the application to finish.
@@ -120,7 +65,6 @@ public abstract class ApplicationRunner {
    * @return true - application finished before timeout
    *         false - otherwise
    */
-  public boolean waitForFinish(Duration timeout) {
-    throw new UnsupportedOperationException(getClass().getName() + " does not 
support timed waitForFinish.");
-  }
+  boolean waitForFinish(Duration timeout);
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java 
b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
new file mode 100644
index 0000000..cd1d06b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.lang.reflect.Constructor;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+
+
+/**
+ * Creates {@link ApplicationRunner} instances based on configuration and 
user-implemented {@link SamzaApplication}
+ *
+ * <p> This class is usually used in main() function to create an instance of 
{@link ApplicationRunner}, as in the example
+ * below:
+ * <pre>{@code
+ *   public static void main(String[] args) {
+ *     CommandLine cmdLine = new CommandLine();
+ *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ *     PageViewCounter app = new PageViewCounter();
+ *     ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, 
config);
+ *     runner.run();
+ *     runner.waitForFinish();
+ *   }
+ * }</pre>
+ */
+public class ApplicationRunners {
+
+  private static final String APP_RUNNER_CFG = "app.runner.class";
+  private static final String DEFAULT_APP_RUNNER = 
"org.apache.samza.runtime.RemoteApplicationRunner";
+
+  private ApplicationRunners() {
+
+  }
+
+  /**
+   * Get the {@link ApplicationRunner} that runs the {@code userApp}
+   *
+   * @param userApp the user application object
+   * @param config the configuration for this application
+   * @return the {@link ApplicationRunner} object that will run the {@code 
userApp}
+   */
+  public static final ApplicationRunner getApplicationRunner(SamzaApplication 
userApp, Config config) {
+    String appRunnerClassName = getAppRunnerClass(config);
+    try {
+      Class<?> runnerClass = Class.forName(appRunnerClassName);
+      if (!ApplicationRunner.class.isAssignableFrom(runnerClass)) {
+        throw new ConfigException(
+            String.format("Class %s does not extend ApplicationRunner 
properly", appRunnerClassName));
+      }
+      Constructor<?> constructor = 
runnerClass.getConstructor(SamzaApplication.class, Config.class); // *sigh*
+      return (ApplicationRunner) constructor.newInstance(userApp, config);
+    } catch (ConfigException ce) {
+      // this is thrown due to invalid app.runner.class configuration
+      throw ce;
+    } catch (Exception e) {
+      // other types of exception during class loading and construction of new 
instance
+      throw new ConfigException(String.format("Could not load 
ApplicationRunner class %s", appRunnerClassName), e);
+    }
+  }
+
+  private static String getAppRunnerClass(Config config) {
+    return config.getOrDefault(APP_RUNNER_CFG, DEFAULT_APP_RUNNER);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorContext.java 
b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorContext.java
new file mode 100644
index 0000000..4e9f5ba
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorContext.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * The context for a StreamProcessor. A placeholder class for the general 
context for Samza application.
+ *
+ * TODO: pending change with SAMZA-1714
+ */
+@InterfaceStability.Unstable
+public interface ProcessorContext {
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListener.java
 
b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListener.java
new file mode 100644
index 0000000..f4e49ed
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * This interface defines methods that are invoked in different stages of 
StreamProcessor's lifecycle in local
+ * process (i.e. as a standalone process, or a container process in YARN 
NodeManager).
+ *
+ * <p>
+ * User can implement this interface to instantiate/release shared objects in 
the local process.
+ */
+@InterfaceStability.Evolving
+public interface ProcessorLifecycleListener {
+  /**
+   * User defined initialization before a StreamProcessor is started
+   */
+  default void beforeStart() {}
+
+  /**
+   * User defined callback after a StreamProcessor is started
+   *
+   */
+  default void afterStart() {}
+
+  /**
+   * User defined callback after a StreamProcessor is stopped successfully
+   */
+  default void afterStop() {}
+
+  /**
+   * User defined callback after a StreamProcessor is stopped with failure
+   *
+   * @param t the error causing the stop of the StreamProcessor.
+   */
+  default void afterFailure(Throwable t) {}
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListenerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListenerFactory.java
 
b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListenerFactory.java
new file mode 100644
index 0000000..eec7b70
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListenerFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This interface class defines the factory method to create an instance of 
{@link ProcessorLifecycleListener}.
+ */
+@InterfaceStability.Evolving
+public interface ProcessorLifecycleListenerFactory extends Serializable {
+  /**
+   * Create an instance of {@link ProcessorLifecycleListener} for the 
StreamProcessor
+   *
+   * @param pContext the context of the corresponding StreamProcessor. Note 
that {@link ProcessorContext} is just a
+   *                 placeholder before we have a proper implementation of 
general context in SAMZA-1714
+   * @param config the configuration of the corresponding StreamProcessor
+   * @return the {@link ProcessorLifecycleListener} callback object for the 
StreamProcessor
+   */
+  ProcessorLifecycleListener createInstance(ProcessorContext pContext, Config 
config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java 
b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
index e5ce9c4..1879ce8 100644
--- a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
@@ -19,10 +19,14 @@
 
 package org.apache.samza.task;
 
+import org.apache.samza.annotation.InterfaceStability;
+
+
 /**
  * Build {@link AsyncStreamTask} instances.
- * Implementations should return a new instance for each {@link 
#createInstance()} invocation.
+ * <p>
+ * Implementations should return a new instance of {@link AsyncStreamTask} for 
each {@link #createInstance()} invocation.
  */
-public interface AsyncStreamTaskFactory {
-  AsyncStreamTask createInstance();
+@InterfaceStability.Stable
+public interface AsyncStreamTaskFactory extends TaskFactory<AsyncStreamTask> {
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java 
b/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java
index 52adef6..8588a0d 100644
--- a/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java
@@ -23,9 +23,9 @@ import org.apache.samza.annotation.InterfaceStability;
 
 /**
  * Build {@link StreamTask} instances.
- * Implementations should return a new instance for each {@link 
#createInstance()} invocation.
+ * <p>
+ * Implementations should return a new instance of {@link StreamTask} for each 
{@link #createInstance()} invocation.
  */
 @InterfaceStability.Stable
-public interface StreamTaskFactory {
-  StreamTask createInstance();
+public interface StreamTaskFactory extends TaskFactory<StreamTask> {
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java 
b/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java
new file mode 100644
index 0000000..8443d20
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * The interface for all task factories (i.e. {@link StreamTaskFactory} and 
{@link AsyncStreamTaskFactory}
+ *
+ * @param <T> the type of task instances
+ */
+@InterfaceStability.Stable
+public interface TaskFactory<T> extends Serializable {
+  /**
+   * Create instance of task
+   *
+   * @return task of type T
+   */
+  T createInstance();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java 
b/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
new file mode 100644
index 0000000..5829cf7
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.ApplicationStatus;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Unit test for {@link ApplicationRunners}
+ */
+public class TestApplicationRunners {
+
+  @Test
+  public void testGetAppRunner() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("app.runner.class", MockApplicationRunner.class.getName());
+    Config config = new MapConfig(configMap);
+    StreamApplication app = mock(StreamApplication.class);
+    ApplicationRunner appRunner = ApplicationRunners.getApplicationRunner(app, 
config);
+    assertTrue(appRunner instanceof MockApplicationRunner);
+  }
+
+  /**
+   * Test class for {@link ApplicationRunners} unit test
+   */
+  public static class MockApplicationRunner implements ApplicationRunner {
+    private final SamzaApplication userApp;
+    private final Config config;
+
+    public MockApplicationRunner(SamzaApplication userApp, Config config) {
+      this.userApp = userApp;
+      this.config = config;
+    }
+
+    @Override
+    public void run() {
+
+    }
+
+    @Override
+    public void kill() {
+
+    }
+
+    @Override
+    public ApplicationStatus status() {
+      return null;
+    }
+
+    @Override
+    public void waitForFinish() {
+
+    }
+
+    @Override
+    public boolean waitForFinish(Duration timeout) {
+      return false;
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
new file mode 100644
index 0000000..9679136
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsReporterFactory;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.runtime.ProcessorLifecycleListener;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * This is the base class that implements interface {@link 
ApplicationDescriptor}.
+ * <p>
+ * This base class contains the common objects that are used by both 
high-level and low-level API applications, such as
+ * {@link Config}, {@link ContextManager}, and {@link 
ProcessorLifecycleListenerFactory}.
+ *
+ * @param <S> the type of {@link ApplicationDescriptor} interface this 
implements. It has to be either
+ *            {@link StreamApplicationDescriptor} or {@link 
TaskApplicationDescriptor}
+ */
+public abstract class ApplicationDescriptorImpl<S extends 
ApplicationDescriptor>
+    implements ApplicationDescriptor<S> {
+
+  final Config config;
+  private final Class<? extends SamzaApplication> appClass;
+  private final Map<String, MetricsReporterFactory> reporterFactories = new 
LinkedHashMap<>();
+
+  // Default to no-op functions in ContextManager
+  // TODO: this should be replaced by shared context factory defined in 
SAMZA-1714
+  ContextManager contextManager = new ContextManager() {
+    @Override
+    public void init(Config config, TaskContext context) {
+    }
+
+    @Override
+    public void close() {
+    }
+  };
+
+  // Default to no-op  ProcessorLifecycleListenerFactory
+  ProcessorLifecycleListenerFactory listenerFactory = (pcontext, cfg) -> new 
ProcessorLifecycleListener() { };
+
+  ApplicationDescriptorImpl(SamzaApplication app, Config config) {
+    this.config = config;
+    this.appClass = app.getClass();
+  }
+
+  @Override
+  public Config getConfig() {
+    return config;
+  }
+
+  @Override
+  public S withContextManager(ContextManager contextManager) {
+    this.contextManager = contextManager;
+    return (S) this;
+  }
+
+  @Override
+  public S 
withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory 
listenerFactory) {
+    this.listenerFactory = listenerFactory;
+    return (S) this;
+  }
+
+  @Override
+  public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> 
reporterFactories) {
+    this.reporterFactories.clear();
+    this.reporterFactories.putAll(reporterFactories);
+    return (S) this;
+  }
+
+  /**
+   * Get the application class
+   *
+   * @return an implementation of {@link SamzaApplication}
+   */
+  public Class<? extends SamzaApplication> getAppClass() {
+    return appClass;
+  }
+
+  /**
+   * Get the {@link ContextManager} associated with this application
+   *
+   * @return the {@link ContextManager} for this application
+   */
+  public ContextManager getContextManager() {
+    return contextManager;
+  }
+
+  /**
+   * Get the {@link ProcessorLifecycleListenerFactory} associated with this 
application
+   *
+   * @return the {@link ProcessorLifecycleListenerFactory} in this application
+   */
+  public ProcessorLifecycleListenerFactory 
getProcessorLifecycleListenerFactory() {
+    return listenerFactory;
+  }
+
+  /**
+   * Get the {@link MetricsReporterFactory}s used in the application
+   *
+   * @return the map of {@link MetricsReporterFactory}s
+   */
+  public Map<String, MetricsReporterFactory> getMetricsReporterFactories() {
+    return Collections.unmodifiableMap(reporterFactories);
+  }
+
+  /**
+   * Get the default {@link SystemDescriptor} in this application
+   *
+   * @return the default {@link SystemDescriptor}
+   */
+  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+    // default is not set
+    return Optional.empty();
+  }
+
+  /**
+   * Get all the {@link InputDescriptor}s to this application
+   *
+   * @return an immutable map of streamId to {@link InputDescriptor}
+   */
+  public abstract Map<String, InputDescriptor> getInputDescriptors();
+
+  /**
+   * Get all the {@link OutputDescriptor}s from this application
+   *
+   * @return an immutable map of streamId to {@link OutputDescriptor}
+   */
+  public abstract Map<String, OutputDescriptor> getOutputDescriptors();
+
+  /**
+   * Get all the broadcast streamIds from this application
+   *
+   * @return an immutable set of streamIds
+   */
+  public abstract Set<String> getBroadcastStreams();
+
+  /**
+   * Get all the {@link TableDescriptor}s in this application
+   *
+   * @return an immutable set of {@link TableDescriptor}s
+   */
+  public abstract Set<TableDescriptor> getTableDescriptors();
+
+  /**
+   * Get all the unique {@link SystemDescriptor}s in this application
+   *
+   * @return an immutable set of {@link SystemDescriptor}s
+   */
+  public abstract Set<SystemDescriptor> getSystemDescriptors();
+
+}
\ No newline at end of file

Reply via email to