Consolidating package names for System, Stream, Application and Table 
descriptors.

Everything in this PR is either:
1. A package name change and a corresponding file move.
2. Javadoc changes to use FQN in link tags to fix checkstyle complaints about 
unused imports, and corresponding fixes to make them fit within line width. No 
change in contents.
3. In a couple of places, changing method visibility to public with 
VisibleForTesting annotations for accessing them from Tests 
(RemoteReadWriteTable.java, RemoteReadableTable.java)

Author: Prateek Maheshwari <pmaheshw...@apache.org>

Reviewers: Bharath Kumarasubramanian <bkuma...@linkedin.com>, Jagadish 
Venkatraman <vjagadish1...@gmail.com>, Yi Pan <nickpa...@gmail.com>

Closes #720 from prateekm/descriptor-package-cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74675cea
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74675cea
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74675cea

Branch: refs/heads/master
Commit: 74675cea55d163bf18bf16c8619355009af2300c
Parents: 9a5094d
Author: Prateek Maheshwari <pmaheshw...@apache.org>
Authored: Fri Oct 12 18:34:34 2018 -0700
Committer: Prateek Maheshwari <pmaheshw...@apache.org>
Committed: Fri Oct 12 18:34:34 2018 -0700

----------------------------------------------------------------------
 docs/startup/quick-start/versioned/index.md     |   2 +-
 .../application/ApplicationDescriptor.java      |  94 ---
 .../samza/application/SamzaApplication.java     |   1 +
 .../samza/application/StreamApplication.java    |   1 +
 .../StreamApplicationDescriptor.java            | 107 ----
 .../samza/application/TaskApplication.java      |   1 +
 .../application/TaskApplicationDescriptor.java  |  64 --
 .../descriptors/ApplicationDescriptor.java      |  96 +++
 .../StreamApplicationDescriptor.java            | 107 ++++
 .../descriptors/TaskApplicationDescriptor.java  |  65 ++
 .../ApplicationContainerContextFactory.java     |   3 +-
 .../context/ApplicationTaskContextFactory.java  |   3 +-
 .../apache/samza/operators/MessageStream.java   |   5 +-
 .../apache/samza/operators/TableDescriptor.java |  64 --
 .../descriptors/GenericInputDescriptor.java     |  48 --
 .../descriptors/GenericOutputDescriptor.java    |  48 --
 .../descriptors/GenericSystemDescriptor.java    |  61 --
 .../base/stream/InputDescriptor.java            | 183 ------
 .../base/stream/OutputDescriptor.java           |  44 --
 .../base/stream/StreamDescriptor.java           | 136 -----
 .../ExpandingInputDescriptorProvider.java       |  44 --
 .../base/system/OutputDescriptorProvider.java   |  48 --
 .../system/SimpleInputDescriptorProvider.java   |  43 --
 .../base/system/SystemDescriptor.java           | 177 ------
 .../TransformingInputDescriptorProvider.java    |  44 --
 .../operators/functions/ClosableFunction.java   |  12 +-
 .../operators/functions/InitableFunction.java   |   9 +-
 .../operators/functions/InputTransformer.java   |  45 --
 .../operators/functions/StreamExpander.java     |  58 --
 .../ExpandingInputDescriptorProvider.java       |  43 ++
 .../descriptors/GenericInputDescriptor.java     |  47 ++
 .../descriptors/GenericOutputDescriptor.java    |  47 ++
 .../descriptors/GenericSystemDescriptor.java    |  58 ++
 .../system/descriptors/InputDescriptor.java     | 181 ++++++
 .../system/descriptors/InputTransformer.java    |  47 ++
 .../system/descriptors/OutputDescriptor.java    |  43 ++
 .../descriptors/OutputDescriptorProvider.java   |  47 ++
 .../SimpleInputDescriptorProvider.java          |  42 ++
 .../system/descriptors/StreamDescriptor.java    | 135 +++++
 .../system/descriptors/StreamExpander.java      |  57 ++
 .../system/descriptors/SystemDescriptor.java    | 175 ++++++
 .../TransformingInputDescriptorProvider.java    |  43 ++
 .../samza/table/TableDescriptorsProvider.java   |   2 +-
 .../org/apache/samza/table/TableProvider.java   |  61 --
 .../samza/table/TableProviderFactory.java       |  35 --
 .../java/org/apache/samza/table/TableSpec.java  |   5 +-
 .../table/descriptors/TableDescriptor.java      |  64 ++
 .../samza/table/descriptors/TableProvider.java  |  62 ++
 .../table/descriptors/TableProviderFactory.java |  36 ++
 .../TestExpandingInputDescriptor.java           |  59 --
 .../descriptors/TestGenericInputDescriptor.java | 123 ----
 .../TestGenericSystemDescriptor.java            |  63 --
 .../descriptors/TestSimpleInputDescriptor.java  |  63 --
 .../TestTransformingInputDescriptor.java        |  64 --
 .../ExampleExpandingInputDescriptor.java        |  30 -
 .../ExampleExpandingOutputDescriptor.java       |  29 -
 .../ExampleExpandingSystemDescriptor.java       |  49 --
 .../serde/ExampleSimpleInputDescriptor.java     |  30 -
 .../serde/ExampleSimpleOutputDescriptor.java    |  29 -
 .../serde/ExampleSimpleSystemDescriptor.java    |  43 --
 .../ExampleTransformingInputDescriptor.java     |  30 -
 .../ExampleTransformingOutputDescriptor.java    |  29 -
 .../ExampleTransformingSystemDescriptor.java    |  43 --
 .../TestExpandingInputDescriptor.java           |  59 ++
 .../descriptors/TestGenericInputDescriptor.java | 123 ++++
 .../TestGenericSystemDescriptor.java            |  63 ++
 .../descriptors/TestSimpleInputDescriptor.java  |  63 ++
 .../TestTransformingInputDescriptor.java        |  64 ++
 .../ExampleExpandingInputDescriptor.java        |  30 +
 .../ExampleExpandingOutputDescriptor.java       |  29 +
 .../ExampleExpandingSystemDescriptor.java       |  49 ++
 .../serde/ExampleSimpleInputDescriptor.java     |  30 +
 .../serde/ExampleSimpleOutputDescriptor.java    |  29 +
 .../serde/ExampleSimpleSystemDescriptor.java    |  43 ++
 .../ExampleTransformingInputDescriptor.java     |  30 +
 .../ExampleTransformingOutputDescriptor.java    |  29 +
 .../ExampleTransformingSystemDescriptor.java    |  43 ++
 .../eventhub/EventHubsInputDescriptor.java      | 121 ----
 .../eventhub/EventHubsOutputDescriptor.java     | 104 ----
 .../eventhub/EventHubsSystemDescriptor.java     | 217 -------
 .../descriptors/EventHubsInputDescriptor.java   | 122 ++++
 .../descriptors/EventHubsOutputDescriptor.java  | 105 ++++
 .../descriptors/EventHubsSystemDescriptor.java  | 219 +++++++
 .../eventhub/TestEventHubsInputDescriptor.java  |  91 ---
 .../eventhub/TestEventHubsOutputDescriptor.java |  88 ---
 .../eventhub/TestEventHubsSystemDescriptor.java | 112 ----
 .../TestEventHubsInputDescriptor.java           |  92 +++
 .../TestEventHubsOutputDescriptor.java          |  89 +++
 .../TestEventHubsSystemDescriptor.java          | 113 ++++
 .../application/ApplicationDescriptorImpl.java  | 298 ---------
 .../application/ApplicationDescriptorUtil.java  |  51 --
 .../application/LegacyTaskApplication.java      |   1 +
 .../StreamApplicationDescriptorImpl.java        | 366 -----------
 .../TaskApplicationDescriptorImpl.java          | 143 -----
 .../descriptors/ApplicationDescriptorImpl.java  | 300 +++++++++
 .../descriptors/ApplicationDescriptorUtil.java  |  54 ++
 .../StreamApplicationDescriptorImpl.java        | 367 +++++++++++
 .../TaskApplicationDescriptorImpl.java          | 144 +++++
 .../apache/samza/config/JavaTableConfig.java    |   4 +-
 .../samza/execution/ExecutionPlanner.java       |   6 +-
 .../org/apache/samza/execution/JobGraph.java    |   4 +-
 .../samza/execution/JobGraphJsonGenerator.java  |   4 +-
 .../org/apache/samza/execution/JobNode.java     |   4 +-
 .../org/apache/samza/execution/JobPlanner.java  |   4 +-
 .../apache/samza/execution/LocalJobPlanner.java |   4 +-
 .../samza/execution/RemoteJobPlanner.java       |   4 +-
 .../samza/operators/BaseTableDescriptor.java    | 110 ----
 .../samza/operators/MessageStreamImpl.java      |   2 +-
 .../samza/operators/OperatorSpecGraph.java      |   2 +-
 .../descriptors/DelegatingSystemDescriptor.java |  64 --
 .../samza/operators/impl/InputOperatorImpl.java |   2 +-
 .../samza/operators/spec/InputOperatorSpec.java |   2 +-
 .../samza/operators/spec/OperatorSpec.java      |   3 +-
 .../samza/operators/spec/OperatorSpecs.java     |   2 +-
 .../stream/IntermediateMessageStreamImpl.java   |   2 +-
 .../samza/runtime/LocalApplicationRunner.java   |   6 +-
 .../samza/runtime/LocalContainerRunner.java     |   6 +-
 .../samza/runtime/RemoteApplicationRunner.java  |   6 +-
 .../descriptors/DelegatingSystemDescriptor.java |  61 ++
 .../samza/table/TableConfigGenerator.java       |   6 +-
 .../org/apache/samza/table/TableManager.java    |   2 +
 .../table/caching/CachingTableDescriptor.java   | 164 -----
 .../table/caching/CachingTableProvider.java     | 104 ----
 .../caching/CachingTableProviderFactory.java    |  34 --
 .../descriptors/CachingTableDescriptor.java     | 166 +++++
 .../descriptors/CachingTableProvider.java       | 105 ++++
 .../CachingTableProviderFactory.java            |  34 ++
 .../guava/GuavaCacheTableDescriptor.java        |  75 ---
 .../caching/guava/GuavaCacheTableProvider.java  |  59 --
 .../guava/GuavaCacheTableProviderFactory.java   |  34 --
 .../descriptors/GuavaCacheTableDescriptor.java  |  75 +++
 .../descriptors/GuavaCacheTableProvider.java    |  60 ++
 .../GuavaCacheTableProviderFactory.java         |  34 ++
 .../descriptors/BaseHybridTableDescriptor.java  |  48 ++
 .../table/descriptors/BaseTableDescriptor.java  | 110 ++++
 .../table/hybrid/BaseHybridTableDescriptor.java |  50 --
 .../table/remote/RemoteReadWriteTable.java      |  15 +-
 .../samza/table/remote/RemoteReadableTable.java |  26 +-
 .../table/remote/RemoteTableDescriptor.java     | 275 ---------
 .../samza/table/remote/RemoteTableProvider.java | 200 ------
 .../remote/RemoteTableProviderFactory.java      |  38 --
 .../descriptors/RemoteTableDescriptor.java      | 278 +++++++++
 .../remote/descriptors/RemoteTableProvider.java | 202 +++++++
 .../descriptors/RemoteTableProviderFactory.java |  38 ++
 .../table/retry/RetriableReadFunction.java      |   2 +-
 .../table/retry/RetriableWriteFunction.java     |   2 +-
 .../samza/table/utils/BaseTableProvider.java    |  73 ---
 .../utils/descriptors/BaseTableProvider.java    |  73 +++
 .../apache/samza/task/StreamOperatorTask.java   |  11 +-
 .../org/apache/samza/task/TaskFactoryUtil.java  |   8 +-
 .../samza/job/local/ThreadJobFactory.scala      |   3 +-
 .../application/MockStreamApplication.java      |   2 +
 .../samza/application/TestApplicationUtil.java  |   2 +
 .../TestStreamApplicationDescriptorImpl.java    | 601 ------------------
 .../TestTaskApplicationDescriptorImpl.java      | 172 ------
 .../TestStreamApplicationDescriptorImpl.java    | 602 +++++++++++++++++++
 .../TestTaskApplicationDescriptorImpl.java      | 173 ++++++
 .../execution/ExecutionPlannerTestBase.java     |  10 +-
 .../samza/execution/TestExecutionPlanner.java   |  22 +-
 .../apache/samza/execution/TestJobGraph.java    |   2 +-
 .../execution/TestJobGraphJsonGenerator.java    |   8 +-
 .../TestJobNodeConfigurationGenerator.java      |  14 +-
 .../samza/execution/TestLocalJobPlanner.java    |   6 +-
 .../samza/execution/TestRemoteJobPlanner.java   |   6 +-
 .../samza/operators/TestJoinOperator.java       |   6 +-
 .../samza/operators/TestMessageStreamImpl.java  |   2 +-
 .../samza/operators/TestOperatorSpecGraph.java  |   2 +-
 .../operators/impl/TestOperatorImplGraph.java   |   8 +-
 .../operators/impl/TestWindowOperator.java      |   6 +-
 .../spec/TestPartitionByOperatorSpec.java       |   6 +-
 .../runtime/TestLocalApplicationRunner.java     |   6 +-
 .../apache/samza/table/TestTableManager.java    |   2 +
 .../samza/table/caching/TestCachingTable.java   |  10 +-
 .../table/remote/TestRemoteTableDescriptor.java | 236 --------
 .../descriptors/TestRemoteTableDescriptor.java  | 239 ++++++++
 .../apache/samza/task/TestTaskFactoryUtil.java  |   6 +-
 .../system/kafka/KafkaInputDescriptor.java      | 108 ----
 .../system/kafka/KafkaOutputDescriptor.java     |  39 --
 .../system/kafka/KafkaSystemDescriptor.java     | 245 --------
 .../kafka/descriptors/KafkaInputDescriptor.java | 108 ++++
 .../descriptors/KafkaOutputDescriptor.java      |  39 ++
 .../descriptors/KafkaSystemDescriptor.java      | 246 ++++++++
 .../system/kafka/TestKafkaInputDescriptor.java  |  66 --
 .../kafka/TestKafkaSystemAdminWithMock.java     |   1 -
 .../system/kafka/TestKafkaSystemDescriptor.java |  69 ---
 .../descriptors/TestKafkaInputDescriptor.java   |  64 ++
 .../descriptors/TestKafkaSystemDescriptor.java  |  70 +++
 .../kv/inmemory/InMemoryTableDescriptor.java    |  74 ---
 .../kv/inmemory/InMemoryTableProvider.java      |  70 ---
 .../inmemory/InMemoryTableProviderFactory.java  |  33 -
 .../descriptors/InMemoryTableDescriptor.java    |  74 +++
 .../descriptors/InMemoryTableProvider.java      |  71 +++
 .../InMemoryTableProviderFactory.java           |  33 +
 .../inmemory/TestInMemoryTableDescriptor.java   |  48 --
 .../kv/inmemory/TestInMemoryTableProvider.java  |  66 --
 .../TestInMemoryTableDescriptor.java            |  48 ++
 .../descriptors/TestInMemoryTableProvider.java  |  67 +++
 .../storage/kv/RocksDbTableDescriptor.java      | 339 -----------
 .../samza/storage/kv/RocksDbTableProvider.java  |  73 ---
 .../storage/kv/RocksDbTableProviderFactory.java |  31 -
 .../kv/descriptors/RocksDbTableDescriptor.java  | 339 +++++++++++
 .../kv/descriptors/RocksDbTableProvider.java    |  74 +++
 .../RocksDbTableProviderFactory.java            |  31 +
 .../storage/kv/TestRocksDbTableDescriptor.java  | 100 ---
 .../storage/kv/TestRocksDbTableProvider.java    |  67 ---
 .../descriptors/TestRocksDbTableDescriptor.java | 100 +++
 .../descriptors/TestRocksDbTableProvider.java   |  68 +++
 .../kv/BaseLocalStoreBackedTableDescriptor.java | 168 ------
 .../kv/BaseLocalStoreBackedTableProvider.java   | 147 -----
 .../BaseLocalStoreBackedTableDescriptor.java    | 168 ++++++
 .../BaseLocalStoreBackedTableProvider.java      | 149 +++++
 .../TestBaseLocalStoreBackedTableProvider.java  | 149 -----
 .../TestBaseLocalStoreBackedTableProvider.java  | 150 +++++
 .../sql/impl/ConfigBasedIOResolverFactory.java  |   4 +-
 .../samza/sql/interfaces/SqlIOConfig.java       |   2 +-
 .../samza/sql/runner/SamzaSqlApplication.java   |   2 +-
 .../samza/sql/translator/ModifyTranslator.java  |   8 +-
 .../samza/sql/translator/QueryTranslator.java   |  10 +-
 .../samza/sql/translator/ScanTranslator.java    |   6 +-
 .../samza/sql/translator/TranslatorContext.java |   4 +-
 .../sql/testutil/TestIOResolverFactory.java     |  12 +-
 .../sql/translator/TestFilterTranslator.java    |   2 +-
 .../sql/translator/TestJoinTranslator.java      |   6 +-
 .../sql/translator/TestProjectTranslator.java   |   2 +-
 .../sql/translator/TestQueryTranslator.java     |   2 +-
 .../sql/translator/TranslatorTestBase.java      |   5 +-
 .../example/AppWithGlobalConfigExample.java     |   8 +-
 .../apache/samza/example/BroadcastExample.java  |   8 +-
 .../samza/example/KeyValueStoreExample.java     |   8 +-
 .../org/apache/samza/example/MergeExample.java  |   8 +-
 .../samza/example/OrderShipmentJoinExample.java |   8 +-
 .../samza/example/PageViewCounterExample.java   |   8 +-
 .../samza/example/RepartitionExample.java       |   8 +-
 .../samza/example/TaskApplicationExample.java   |  12 +-
 .../org/apache/samza/example/WindowExample.java |   8 +-
 .../samza/test/framework/StreamAssert.java      |   6 +-
 .../apache/samza/test/framework/TestRunner.java |   6 +-
 .../system/InMemoryInputDescriptor.java         |  42 --
 .../system/InMemoryOutputDescriptor.java        |  46 --
 .../system/InMemorySystemDescriptor.java        | 109 ----
 .../descriptors/InMemoryInputDescriptor.java    |  42 ++
 .../descriptors/InMemoryOutputDescriptor.java   |  46 ++
 .../descriptors/InMemorySystemDescriptor.java   | 109 ++++
 .../TestStandaloneIntegrationApplication.java   |   8 +-
 .../EndOfStreamIntegrationTest.java             |   6 +-
 .../WatermarkIntegrationTest.java               |   6 +-
 .../AsyncStreamTaskIntegrationTest.java         |   6 +-
 .../test/framework/BroadcastAssertApp.java      |   6 +-
 .../test/framework/FaultInjectionTest.java      |   6 +-
 .../StreamApplicationIntegrationTest.java       |  19 +-
 .../framework/StreamTaskIntegrationTest.java    |  21 +-
 .../samza/test/framework/TestSchedulingApp.java |   6 +-
 .../test/operator/RepartitionJoinWindowApp.java |   6 +-
 .../test/operator/RepartitionWindowApp.java     |   8 +-
 .../samza/test/operator/SessionWindowApp.java   |   8 +-
 .../samza/test/operator/TumblingWindowApp.java  |   8 +-
 .../test/processor/TestStreamApplication.java   |   8 +-
 .../apache/samza/test/table/TestLocalTable.java |   8 +-
 .../table/TestLocalTableWithSideInputs.java     |  16 +-
 .../samza/test/table/TestRemoteTable.java       |  16 +-
 .../table/TestTableDescriptorsProvider.java     |  10 +-
 .../benchmark/SystemConsumerWithSamzaBench.java |   4 +-
 262 files changed, 8425 insertions(+), 8353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/docs/startup/quick-start/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/quick-start/versioned/index.md 
b/docs/startup/quick-start/versioned/index.md
index 44b8376..a046ee7 100644
--- a/docs/startup/quick-start/versioned/index.md
+++ b/docs/startup/quick-start/versioned/index.md
@@ -54,7 +54,7 @@ Now let’s write some code! The first step is to create your 
own Samza applicat
 package samzaapp;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 
 public class WordCount implements StreamApplication {
  @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
deleted file mode 100644
index e806aad..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import java.util.Map;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.ApplicationContainerContextFactory;
-import org.apache.samza.context.ApplicationTaskContextFactory;
-import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-
-
-/**
- * The interface class to describe the configuration, input and output 
streams, and processing logic in a {@link SamzaApplication}.
- * <p>
- * Sub-classes {@link StreamApplicationDescriptor} and {@link 
TaskApplicationDescriptor} are specific interfaces for applications
- * written in high-level {@link StreamApplication} and low-level {@link 
TaskApplication} APIs, respectively.
- *
- * @param <S> sub-class of user application descriptor.
- */
-@InterfaceStability.Evolving
-public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
-
-  /**
-   * Get the {@link Config} of the application
-   * @return config of the application
-   */
-  Config getConfig();
-
-  /**
-   * Sets the {@link ApplicationContainerContextFactory} for this application. 
Each task will be given access to a
-   * different instance of the {@link 
org.apache.samza.context.ApplicationContainerContext} that this creates. The
-   * context can be accessed through the {@link 
org.apache.samza.context.Context}.
-   * <p>
-   * Setting this is optional.
-   *
-   * @param factory the {@link ApplicationContainerContextFactory} for this 
application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
factory} set as its
-   * {@link ApplicationContainerContextFactory}
-   */
-  S 
withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> 
factory);
-
-  /**
-   * Sets the {@link ApplicationTaskContextFactory} for this application. Each 
task will be given access to a different
-   * instance of the {@link org.apache.samza.context.ApplicationTaskContext} 
that this creates. The context can be
-   * accessed through the {@link org.apache.samza.context.Context}.
-   * <p>
-   * Setting this is optional.
-   *
-   * @param factory the {@link ApplicationTaskContextFactory} for this 
application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
factory} set as its
-   * {@link ApplicationTaskContextFactory}
-   */
-  S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> 
factory);
-
-  /**
-   * Sets the {@link ProcessorLifecycleListenerFactory} for this application.
-   *
-   * <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a 
user application. It allows users to
-   * plug in optional code to be invoked in different stages before/after the 
main processing logic is started/stopped in
-   * the application.
-   *
-   * @param listenerFactory the user implemented {@link 
ProcessorLifecycleListenerFactory} that creates lifecycle listener
-   *                        with callback methods before and after the 
start/stop of each StreamProcessor in the application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
-   */
-  S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory 
listenerFactory);
-
-  /**
-   * Sets a set of customized {@link MetricsReporterFactory}s in the 
application
-   *
-   * @param reporterFactories the map of customized {@link 
MetricsReporterFactory}s to be used
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
reporterFactories}
-   */
-  S withMetricsReporterFactories(Map<String, MetricsReporterFactory> 
reporterFactories);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java 
b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
index 7606be8..5423e2e 100644
--- a/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
@@ -19,6 +19,7 @@
 package org.apache.samza.application;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java 
b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
index a83cb37..fe77045 100644
--- 
a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
+++ 
b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
@@ -19,6 +19,7 @@
 package org.apache.samza.application;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 
 /**
  * Describes and initializes the transforms for processing message streams and 
generating results in high-level API. 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
deleted file mode 100644
index dc24771..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.table.Table;
-
-
-/**
- * The interface class to describe a {@link SamzaApplication} in high-level 
API in Samza.
- */
-@InterfaceStability.Evolving
-public interface StreamApplicationDescriptor extends 
ApplicationDescriptor<StreamApplicationDescriptor> {
-
-  /**
-   * Sets the default SystemDescriptor to use for intermediate streams. This 
is equivalent to setting
-   * {@code job.default.system} and its properties in configuration.
-   * <p>
-   * If the default system descriptor is set, it must be set <b>before</b> 
creating any input/output/intermediate streams.
-   *
-   * @param defaultSystemDescriptor the default system descriptor to use
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
defaultSystemDescriptor} set as its default system
-   */
-  StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> 
defaultSystemDescriptor);
-
-  /**
-   * Gets the input {@link MessageStream} corresponding to the {@code 
inputDescriptor}.
-   * <p>
-   * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
-   * can receive messages of type {@code KV<K, V>}. An input {@code 
MessageStream<M>}, obtained using a descriptor with
-   * any other {@code Serde<M>}, can receive messages of type M - the key in 
the incoming message is ignored.
-   * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
-   * {@code SystemConsumer} deserializes the incoming messages itself, and no 
further deserialization is required from
-   * the framework.
-   * <p>
-   * Multiple invocations of this method with the same {@code inputDescriptor} 
will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param inputDescriptor the descriptor for the stream
-   * @param <M> the type of messages in the input {@link MessageStream}
-   * @return the input {@link MessageStream}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@code inputDescriptor}
-   */
-  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
-
-  /**
-   * Gets the {@link OutputStream} corresponding to the {@code 
outputDescriptor}.
-   * <p>
-   * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
-   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, 
obtained using a descriptor with any
-   * other {@code Serde<M>}, can send messages of type M without a key.
-   * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
-   * {@code SystemProducer} serializes the outgoing messages itself, and no 
prior serialization is required from
-   * the framework.
-   * <p>
-   * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are 
partitioned using their serialized key.
-   * When sending messages to any other {@code OutputStream<M>}, messages are 
partitioned using a null partition key.
-   * <p>
-   * Multiple invocations of this method with the same {@code 
outputDescriptor} will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param outputDescriptor the descriptor for the stream
-   * @param <M> the type of messages in the {@link OutputStream}
-   * @return the {@link OutputStream}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@code outputDescriptor}
-   */
-  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
-
-  /**
-   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
-   * <p>
-   * Multiple invocations of this method with the same {@link TableDescriptor} 
will throw an
-   * {@link IllegalStateException}.
-   *
-   * @param tableDescriptor the {@link TableDescriptor}
-   * @param <K> the type of the key
-   * @param <V> the type of the value
-   * @return the {@link Table} corresponding to the {@code tableDescriptor}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@link TableDescriptor}
-   */
-  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java 
b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
index 424634d..d84aa12 100644
--- a/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
@@ -19,6 +19,7 @@
 package org.apache.samza.application;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
deleted file mode 100644
index 0226bb5..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.task.TaskFactory;
-
-
-/**
- *  The interface to describe a {@link SamzaApplication} that uses low-level 
API task for processing.
- */
-@InterfaceStability.Evolving
-public interface TaskApplicationDescriptor extends 
ApplicationDescriptor<TaskApplicationDescriptor> {
-
-  /**
-   * Sets the {@link TaskFactory} for the user application. The {@link 
TaskFactory#createInstance()} creates task instance
-   * that implements the main processing logic of the user application.
-   *
-   * @param factory the {@link TaskFactory} including the low-level task 
processing logic. The only allowed task factory
-   *                classes are {@link 
org.apache.samza.task.StreamTaskFactory} and {@link 
org.apache.samza.task.AsyncStreamTaskFactory}.
-   */
-  void setTaskFactory(TaskFactory factory);
-
-  /**
-   * Adds the input stream to the application.
-   *
-   * @param isd the {@link InputDescriptor}
-   */
-  void addInputStream(InputDescriptor isd);
-
-  /**
-   * Adds the output stream to the application.
-   *
-   * @param osd the {@link OutputDescriptor} of the output stream
-   */
-  void addOutputStream(OutputDescriptor osd);
-
-  /**
-   * Adds the {@link TableDescriptor} used in the application
-   *
-   * @param table {@link TableDescriptor}
-   */
-  void addTable(TableDescriptor table);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
new file mode 100644
index 0000000..b1e78b0
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.metrics.MetricsReporterFactory;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+
+
+/**
+ * The interface class to describe the configuration, input and output 
streams, and processing logic in a
+ * {@link org.apache.samza.application.SamzaApplication}.
+ * <p>
+ * Sub-classes {@link StreamApplicationDescriptor} and {@link 
TaskApplicationDescriptor} are specific interfaces for
+ * applications written in high-level {@link 
org.apache.samza.application.StreamApplication} and low-level
+ * {@link org.apache.samza.application.TaskApplication} APIs, respectively.
+ *
+ * @param <S> sub-class of user application descriptor.
+ */
+@InterfaceStability.Evolving
+public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
+
+  /**
+   * Get the {@link Config} of the application
+   * @return config of the application
+   */
+  Config getConfig();
+
+  /**
+   * Sets the {@link ApplicationContainerContextFactory} for this application. 
Each task will be given access to a
+   * different instance of the {@link 
org.apache.samza.context.ApplicationContainerContext} that this creates. The
+   * context can be accessed through the {@link 
org.apache.samza.context.Context}.
+   * <p>
+   * Setting this is optional.
+   *
+   * @param factory the {@link ApplicationContainerContextFactory} for this 
application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
factory} set as its
+   * {@link ApplicationContainerContextFactory}
+   */
+  S 
withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> 
factory);
+
+  /**
+   * Sets the {@link ApplicationTaskContextFactory} for this application. Each 
task will be given access to a different
+   * instance of the {@link org.apache.samza.context.ApplicationTaskContext} 
that this creates. The context can be
+   * accessed through the {@link org.apache.samza.context.Context}.
+   * <p>
+   * Setting this is optional.
+   *
+   * @param factory the {@link ApplicationTaskContextFactory} for this 
application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
factory} set as its
+   * {@link ApplicationTaskContextFactory}
+   */
+  S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> 
factory);
+
+  /**
+   * Sets the {@link ProcessorLifecycleListenerFactory} for this application.
+   *
+   * <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a 
user application. It allows users to
+   * plug in optional code to be invoked in different stages before/after the 
main processing logic is started/stopped in
+   * the application.
+   *
+   * @param listenerFactory the user implemented {@link 
ProcessorLifecycleListenerFactory} that creates lifecycle listener
+   *                        with callback methods before and after the 
start/stop of each StreamProcessor in the application
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
+   */
+  S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory 
listenerFactory);
+
+  /**
+   * Sets a set of customized {@link MetricsReporterFactory}s in the 
application
+   *
+   * @param reporterFactories the map of customized {@link 
MetricsReporterFactory}s to be used
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
reporterFactories}
+   */
+  S withMetricsReporterFactories(Map<String, MetricsReporterFactory> 
reporterFactories);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
new file mode 100644
index 0000000..383e9ce
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.table.Table;
+
+
+/**
+ * The interface class to describe a {@link 
org.apache.samza.application.SamzaApplication} in high-level API in Samza.
+ */
+@InterfaceStability.Evolving
+public interface StreamApplicationDescriptor extends 
ApplicationDescriptor<StreamApplicationDescriptor> {
+
+  /**
+   * Sets the default SystemDescriptor to use for intermediate streams. This 
is equivalent to setting
+   * {@code job.default.system} and its properties in configuration.
+   * <p>
+   * If the default system descriptor is set, it must be set <b>before</b> 
creating any input/output/intermediate streams.
+   *
+   * @param defaultSystemDescriptor the default system descriptor to use
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code 
defaultSystemDescriptor} set as its default system
+   */
+  StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> 
defaultSystemDescriptor);
+
+  /**
+   * Gets the input {@link MessageStream} corresponding to the {@code 
inputDescriptor}.
+   * <p>
+   * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
+   * can receive messages of type {@code KV<K, V>}. An input {@code 
MessageStream<M>}, obtained using a descriptor with
+   * any other {@code Serde<M>}, can receive messages of type M - the key in 
the incoming message is ignored.
+   * <p>
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
+   * {@code SystemConsumer} deserializes the incoming messages itself, and no 
further deserialization is required from
+   * the framework.
+   * <p>
+   * Multiple invocations of this method with the same {@code inputDescriptor} 
will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param inputDescriptor the descriptor for the stream
+   * @param <M> the type of messages in the input {@link MessageStream}
+   * @return the input {@link MessageStream}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@code inputDescriptor}
+   */
+  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
+
+  /**
+   * Gets the {@link OutputStream} corresponding to the {@code 
outputDescriptor}.
+   * <p>
+   * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
+   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, 
obtained using a descriptor with any
+   * other {@code Serde<M>}, can send messages of type M without a key.
+   * <p>
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
+   * {@code SystemProducer} serializes the outgoing messages itself, and no 
prior serialization is required from
+   * the framework.
+   * <p>
+   * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are 
partitioned using their serialized key.
+   * When sending messages to any other {@code OutputStream<M>}, messages are 
partitioned using a null partition key.
+   * <p>
+   * Multiple invocations of this method with the same {@code 
outputDescriptor} will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param outputDescriptor the descriptor for the stream
+   * @param <M> the type of messages in the {@link OutputStream}
+   * @return the {@link OutputStream}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@code outputDescriptor}
+   */
+  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
+
+  /**
+   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
+   * <p>
+   * Multiple invocations of this method with the same {@link TableDescriptor} 
will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param tableDescriptor the {@link TableDescriptor}
+   * @param <K> the type of the key
+   * @param <V> the type of the value
+   * @return the {@link Table} corresponding to the {@code tableDescriptor}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@link TableDescriptor}
+   */
+  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
new file mode 100644
index 0000000..4730297
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.task.TaskFactory;
+
+
+/**
+ *  The interface to describe a {@link 
org.apache.samza.application.SamzaApplication} that uses low-level API task
+ *  for processing.
+ */
+@InterfaceStability.Evolving
+public interface TaskApplicationDescriptor extends 
ApplicationDescriptor<TaskApplicationDescriptor> {
+
+  /**
+   * Sets the {@link TaskFactory} for the user application. The {@link 
TaskFactory#createInstance()} creates task instance
+   * that implements the main processing logic of the user application.
+   *
+   * @param factory the {@link TaskFactory} including the low-level task 
processing logic. The only allowed task factory
+   *                classes are {@link 
org.apache.samza.task.StreamTaskFactory} and {@link 
org.apache.samza.task.AsyncStreamTaskFactory}.
+   */
+  void setTaskFactory(TaskFactory factory);
+
+  /**
+   * Adds the input stream to the application.
+   *
+   * @param isd the {@link InputDescriptor}
+   */
+  void addInputStream(InputDescriptor isd);
+
+  /**
+   * Adds the output stream to the application.
+   *
+   * @param osd the {@link OutputDescriptor} of the output stream
+   */
+  void addOutputStream(OutputDescriptor osd);
+
+  /**
+   * Adds the {@link TableDescriptor} used in the application
+   *
+   * @param table {@link TableDescriptor}
+   */
+  void addTable(TableDescriptor table);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
 
b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
index fbc2eef..074b0b4 100644
--- 
a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
+++ 
b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
@@ -29,7 +29,8 @@ import java.io.Serializable;
  * stage. At that stage, the framework-provided job-level and container-level 
contexts are available for creating the
  * {@link ApplicationContainerContext}.
  * <p>
- * This is {@link Serializable} because it is specified in {@link 
org.apache.samza.application.ApplicationDescriptor}.
+ * This is {@link Serializable} because it is specified in the
+ * {@link org.apache.samza.application.descriptors.ApplicationDescriptor}.
  * @param <T> concrete type of {@link ApplicationContainerContext} returned by 
this factory
  */
 public interface ApplicationContainerContextFactory<T extends 
ApplicationContainerContext> extends Serializable {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
 
b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
index af9ad68..619bbc7 100644
--- 
a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
+++ 
b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
@@ -29,7 +29,8 @@ import java.io.Serializable;
  * task. At that stage, the framework-provided job-level, container-level, and 
task-level contexts are available for
  * creating the {@link ApplicationTaskContext}. Also, the application-defined 
container-level context is available.
  * <p>
- * This is {@link Serializable} because it is specified in {@link 
org.apache.samza.application.ApplicationDescriptor}.
+ * This is {@link Serializable} because it is specified in the
+ * {@link org.apache.samza.application.descriptors.ApplicationDescriptor}.
  * @param <T> concrete type of {@link ApplicationTaskContext} returned by this 
factory
  */
 public interface ApplicationTaskContextFactory<T extends 
ApplicationTaskContext> extends Serializable {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 97ac65d..f951a84 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -40,7 +40,7 @@ import org.apache.samza.table.Table;
  * A stream of messages that can be transformed into another {@link 
MessageStream}.
  * <p>
  * A {@link MessageStream} corresponding to an input stream can be obtained 
using
- * {@link 
org.apache.samza.application.StreamApplicationDescriptor#getInputStream}.
+ * {@link 
org.apache.samza.application.descriptors.StreamApplicationDescriptor#getInputStream}.
  *
  * @param <M> the type of messages in this stream
  */
@@ -214,7 +214,8 @@ public interface MessageStream<M> {
 
   /**
    * Re-partitions this {@link MessageStream} using keys from the {@code 
keyExtractor} by creating a new
-   * intermediate stream on the default system provided via {@link 
org.apache.samza.application.StreamApplicationDescriptor#withDefaultSystem}.
+   * intermediate stream on the default system provided via
+   * {@link 
org.apache.samza.application.descriptors.StreamApplicationDescriptor#withDefaultSystem}.
    * This intermediate stream is both an output and input to the job.
    * <p>
    * Uses the provided {@link KVSerde} for serialization of keys and values.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java 
b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
deleted file mode 100644
index dbcd65e..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-/**
- * User facing class to collect metadata that fully describes a
- * Samza table. This interface should be implemented by concrete table 
implementations.
- * <p>
- * Typical user code should look like the following, notice 
<code>withConfig()</code>
- * is defined in this class and the rest in subclasses.
- *
- * <pre>
- * {@code
- * TableDescriptor<Integer, String, ?> tableDesc = new 
RocksDbTableDescriptor("tbl",
- *         KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
- *     .withBlockSize(1024)
- *     .withConfig("some-key", "some-value");
- * }
- * </pre>
-
- * Once constructed, a table descriptor can be registered with the system. 
Internally,
- * the table descriptor is then converted to a {@link 
org.apache.samza.table.TableSpec},
- * which is used to track tables internally.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- * @param <D> the type of the concrete table descriptor
- */
-@InterfaceStability.Unstable
-public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
-
-  /**
-   * Get the Id of the table
-   * @return Id of the table
-   */
-  String getTableId();
-
-  /**
-   * Add a configuration entry for the table
-   * @param key the key
-   * @param value the value
-   * @return this table descriptor instance
-   */
-  D withConfig(String key, String value);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
deleted file mode 100644
index 09dd381..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for a generic input stream.
- * <p>
- * An instance of this descriptor may be obtained from an appropriately 
configured {@link GenericSystemDescriptor}.
- * <p>
- * If the system being used provides its own system and stream descriptor 
implementations, they should be used instead.
- * Otherwise, this {@link GenericInputDescriptor} may be used to provide 
Samza-specific properties of the input stream.
- * Additional system stream specific properties may be provided using {@link 
#withStreamConfigs}
- * <p>
- * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- */
-public final class GenericInputDescriptor<StreamMessageType>
-    extends InputDescriptor<StreamMessageType, 
GenericInputDescriptor<StreamMessageType>> {
-  GenericInputDescriptor(String streamId, SystemDescriptor systemDescriptor, 
Serde serde) {
-    super(streamId, serde, systemDescriptor, null);
-  }
-
-  @Override
-  public GenericInputDescriptor<StreamMessageType> withPhysicalName(String 
physicalName) {
-    return super.withPhysicalName(physicalName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
deleted file mode 100644
index 155bd4e..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors;
-
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for a generic output stream.
- * <p>
- * An instance of this descriptor may be obtained from an appropriately 
configured {@link GenericSystemDescriptor}.
- * <p>
- * If the system being used provides its own system and stream descriptor 
implementations, they should be used instead.
- * Otherwise, this {@link GenericOutputDescriptor} may be used to provide 
Samza-specific properties of the output stream.
- * Additional system stream specific properties may be provided using {@link 
#withStreamConfigs}
- * <p>
- * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- */
-public final class GenericOutputDescriptor<StreamMessageType>
-    extends OutputDescriptor<StreamMessageType, 
GenericOutputDescriptor<StreamMessageType>> {
-  GenericOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, 
Serde serde) {
-    super(streamId, serde, systemDescriptor);
-  }
-
-  @Override
-  public GenericOutputDescriptor<StreamMessageType> withPhysicalName(String 
physicalName) {
-    return super.withPhysicalName(physicalName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
deleted file mode 100644
index 24f7932..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors;
-
-
-import 
org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
-import 
org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for a generic system.
- * <p>
- * If the system being used provides its own system and stream descriptor 
implementations, they should be used instead.
- * Otherwise, this {@link GenericSystemDescriptor} may be used to provide 
Samza-specific properties of the system.
- * Additional system specific properties may be provided using {@link 
#withSystemConfigs}
- * <p>
- * System properties configured using a descriptor override corresponding 
properties provided in configuration.
- */
-public final class GenericSystemDescriptor extends 
SystemDescriptor<GenericSystemDescriptor>
-    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
-
-  /**
-   * Constructs a {@link GenericSystemDescriptor} instance with no system 
level serde.
-   * Serdes must be provided explicitly at stream level when getting input or 
output descriptors.
-   *
-   * @param systemName name of this system
-   * @param factoryClassName name of the SystemFactory class for this system
-   */
-  public GenericSystemDescriptor(String systemName, String factoryClassName) {
-    super(systemName, factoryClassName, null, null);
-  }
-
-  @Override
-  public <StreamMessageType> GenericInputDescriptor<StreamMessageType> 
getInputDescriptor(
-      String streamId, Serde<StreamMessageType> serde) {
-    return new GenericInputDescriptor<>(streamId, this, serde);
-  }
-
-  @Override
-  public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> 
getOutputDescriptor(
-      String streamId, Serde<StreamMessageType> serde) {
-    return new GenericOutputDescriptor<>(streamId, this, serde);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
deleted file mode 100644
index 708dd2a..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.stream;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.SystemStreamMetadata.OffsetType;
-
-/**
- * The base descriptor for an input stream. Allows setting properties that are 
common to all input streams.
- * <p>
- * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- * @param <SubClass> type of the concrete sub-class
- */
-public abstract class InputDescriptor<StreamMessageType, SubClass extends 
InputDescriptor<StreamMessageType, SubClass>>
-    extends StreamDescriptor<StreamMessageType, SubClass> {
-  private static final String RESET_OFFSET_CONFIG_KEY = 
"streams.%s.samza.reset.offset";
-  private static final String OFFSET_DEFAULT_CONFIG_KEY = 
"streams.%s.samza.offset.default";
-  private static final String PRIORITY_CONFIG_KEY = 
"streams.%s.samza.priority";
-  private static final String BOOTSTRAP_CONFIG_KEY = 
"streams.%s.samza.bootstrap";
-  private static final String BOUNDED_CONFIG_KEY = "streams.%s.samza.bounded";
-  private static final String DELETE_COMMITTED_MESSAGES_CONFIG_KEY = 
"streams.%s.samza.delete.committed.messages";
-
-  private final Optional<InputTransformer> transformerOptional;
-
-  private Optional<Boolean> resetOffsetOptional = Optional.empty();
-  private Optional<OffsetType> offsetDefaultOptional = Optional.empty();
-  private Optional<Integer> priorityOptional = Optional.empty();
-  private Optional<Boolean> isBootstrapOptional = Optional.empty();
-  private Optional<Boolean> isBoundedOptional = Optional.empty();
-  private Optional<Boolean> deleteCommittedMessagesOptional = Optional.empty();
-
-  /**
-   * Constructs an {@link InputDescriptor} instance.
-   *
-   * @param streamId id of the stream
-   * @param serde serde for messages in the stream
-   * @param systemDescriptor system descriptor this stream descriptor was 
obtained from
-   * @param transformer stream level input stream transform function if 
available, else null
-   */
-  public InputDescriptor(String streamId, Serde serde, SystemDescriptor 
systemDescriptor, InputTransformer transformer) {
-    super(streamId, serde, systemDescriptor);
-
-    // stream level transformer takes precedence over system level transformer
-    if (transformer != null) {
-      this.transformerOptional = Optional.of(transformer);
-    } else {
-      this.transformerOptional = systemDescriptor.getTransformer();
-    }
-  }
-
-  /**
-   * If set, when a Samza container starts up, it ignores any checkpointed 
offset for this particular
-   * input stream. Its behavior is thus determined by the {@link 
#withOffsetDefault} setting.
-   * Note that the reset takes effect every time a container is started, which 
may be every time you restart your job,
-   * or more frequently if a container fails and is restarted by the framework.
-   *
-   * @return this input descriptor
-   */
-  public SubClass shouldResetOffset() {
-    this.resetOffsetOptional = Optional.of(true);
-    return (SubClass) this;
-  }
-
-  /**
-   * If a container starts up without a checkpoint, this property determines 
where in the input stream we should start
-   * consuming. The value must be an OffsetType, one of the following:
-   * <ul>
-   *  <li>upcoming: Start processing messages that are published after the job 
starts.
-   *                Any messages published while the job was not running are 
not processed.
-   *  <li>oldest: Start processing at the oldest available message in the 
system,
-   *              and reprocess the entire available message history.
-   * </ul>
-   * This property is for an individual stream. To set it for all streams 
within a system, see
-   * {@link SystemDescriptor#withDefaultStreamOffsetDefault}. If both are 
defined, the stream-level definition
-   * takes precedence.
-   *
-   * @param offsetDefault offset type to start processing from
-   * @return this input descriptor
-   */
-  public SubClass withOffsetDefault(OffsetType offsetDefault) {
-    this.offsetDefaultOptional = Optional.ofNullable(offsetDefault);
-    return (SubClass) this;
-  }
-
-  /**
-   * If one or more streams have a priority set (any positive integer), they 
will be processed with higher priority
-   * than the other streams.
-   * <p>
-   * You can set several streams to the same priority, or define multiple 
priority levels by assigning a
-   * higher number to the higher-priority streams.
-   * <p>
-   * If a higher-priority stream has any messages available, they will always 
be processed first;
-   * messages from lower-priority streams are only processed when there are no 
new messages on higher-priority inputs.
-   *
-   * @param priority priority for this input stream
-   * @return this input descriptor
-   */
-  public SubClass withPriority(int priority) {
-    this.priorityOptional = Optional.of(priority);
-    return (SubClass) this;
-  }
-
-  /**
-   * If set, this stream will be processed as a bootstrap stream. This means 
that every time a Samza container
-   * starts up, this stream will be fully consumed before messages from any 
other stream are processed.
-   *
-   * @return this input descriptor
-   */
-  public SubClass shouldBootstrap() {
-    this.isBootstrapOptional = Optional.of(true);
-    return (SubClass) this;
-  }
-
-  /**
-   * If set, this stream will be considered a bounded stream. If all input 
streams in an application are
-   * bounded, the job is considered to be running in batch processing mode.
-   *
-   * @return this input descriptor
-   */
-  public SubClass isBounded() {
-    this.isBoundedOptional = Optional.of(true);
-    return (SubClass) this;
-  }
-
-  /**
-   * If set, and supported by the system implementation, messages older than 
the latest checkpointed offset
-   * for this stream may be deleted after the commit.
-   *
-   * @return this input descriptor
-   */
-  public SubClass shouldDeleteCommittedMessages() {
-    this.deleteCommittedMessagesOptional = Optional.of(true);
-    return (SubClass) this;
-  }
-
-  public Optional<InputTransformer> getTransformer() {
-    return this.transformerOptional;
-  }
-
-  @Override
-  public Map<String, String> toConfig() {
-    HashMap<String, String> configs = new HashMap<>(super.toConfig());
-    String streamId = getStreamId();
-    this.offsetDefaultOptional.ifPresent(od ->
-        configs.put(String.format(OFFSET_DEFAULT_CONFIG_KEY, streamId), 
od.name().toLowerCase()));
-    this.resetOffsetOptional.ifPresent(resetOffset ->
-        configs.put(String.format(RESET_OFFSET_CONFIG_KEY, streamId), 
Boolean.toString(resetOffset)));
-    this.priorityOptional.ifPresent(priority ->
-        configs.put(String.format(PRIORITY_CONFIG_KEY, streamId), 
Integer.toString(priority)));
-    this.isBootstrapOptional.ifPresent(bootstrap ->
-        configs.put(String.format(BOOTSTRAP_CONFIG_KEY, streamId), 
Boolean.toString(bootstrap)));
-    this.isBoundedOptional.ifPresent(bounded ->
-        configs.put(String.format(BOUNDED_CONFIG_KEY, streamId), 
Boolean.toString(bounded)));
-    this.deleteCommittedMessagesOptional.ifPresent(deleteCommittedMessages ->
-        configs.put(String.format(DELETE_COMMITTED_MESSAGES_CONFIG_KEY, 
streamId),
-            Boolean.toString(deleteCommittedMessages)));
-    return Collections.unmodifiableMap(configs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
deleted file mode 100644
index 20bbc53..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.stream;
-
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * The base descriptor for an output stream. Allows setting properties that 
are common to all output streams.
- * <p>
- * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- * @param <SubClass> type of the concrete sub-class
- */
-public abstract class OutputDescriptor<StreamMessageType, SubClass extends 
OutputDescriptor<StreamMessageType, SubClass>>
-    extends StreamDescriptor<StreamMessageType, SubClass> {
-  /**
-   * Constructs an {@link OutputDescriptor} instance.
-   *
-   * @param streamId id of the stream
-   * @param serde serde for messages in the stream
-   * @param systemDescriptor system descriptor this stream descriptor was 
obtained from
-   */
-  public OutputDescriptor(String streamId, Serde serde, SystemDescriptor 
systemDescriptor) {
-    super(streamId, serde, systemDescriptor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
deleted file mode 100644
index f7de728..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.stream;
-
-import com.google.common.base.Preconditions;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * The base descriptor for an input or output stream. Allows setting 
properties that are common to all streams.
- * <p>
- * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- * @param <SubClass> type of the concrete sub-class
- */
-public abstract class StreamDescriptor<StreamMessageType, SubClass extends 
StreamDescriptor<StreamMessageType, SubClass>> {
-  private static final String SYSTEM_CONFIG_KEY = "streams.%s.samza.system";
-  private static final String PHYSICAL_NAME_CONFIG_KEY = 
"streams.%s.samza.physical.name";
-  private static final String STREAM_CONFIGS_CONFIG_KEY = "streams.%s.%s";
-  private static final Pattern STREAM_ID_PATTERN = 
Pattern.compile("[\\d\\w-_]+");
-
-  private final String streamId;
-  private final Serde serde;
-  private final SystemDescriptor systemDescriptor;
-
-  private final Map<String, String> streamConfigs = new HashMap<>();
-  private Optional<String> physicalNameOptional = Optional.empty();
-
-  /**
-   * Constructs a {@link StreamDescriptor} instance.
-   *
-   * @param streamId id of the stream
-   * @param serde serde for messages in the stream
-   * @param systemDescriptor system descriptor this stream descriptor was 
obtained from
-   */
-  StreamDescriptor(String streamId, Serde serde, SystemDescriptor 
systemDescriptor) {
-    Preconditions.checkArgument(systemDescriptor != null,
-        String.format("SystemDescriptor must not be null. streamId: %s", 
streamId));
-    String systemName = systemDescriptor.getSystemName();
-    Preconditions.checkState(isValidStreamId(streamId),
-        String.format("streamId must be non-empty and must not contain spaces 
or special characters. " +
-            "streamId: %s, systemName: %s", streamId, systemName));
-    Preconditions.checkArgument(serde != null,
-        String.format("Serde must not be null. streamId: %s systemName: %s", 
streamId, systemName));
-    this.streamId = streamId;
-    this.serde = serde;
-    this.systemDescriptor = systemDescriptor;
-  }
-
-  /**
-   * The physical name of the stream on the system on which this stream will 
be accessed.
-   * This is opposed to the {@code streamId} which is the logical name that 
Samza uses to identify the stream.
-   * <p>
-   * A physical name could be a Kafka topic name, an HDFS file URN, or any 
other system-specific identifier.
-   * <p>
-   * If not provided, the logical {@code streamId} is used as the physical 
name.
-   *
-   * @param physicalName physical name for this stream.
-   * @return this stream descriptor.
-   */
-  protected SubClass withPhysicalName(String physicalName) {
-    this.physicalNameOptional = Optional.ofNullable(physicalName);
-    return (SubClass) this;
-  }
-
-  /**
-   * Additional system-specific properties for this stream.
-   * <p>
-   * These properties are added under the {@code streams.stream-id.*} scope.
-   *
-   * @param streamConfigs system-specific properties for this stream
-   * @return this stream descriptor
-   */
-  public SubClass withStreamConfigs(Map<String, String> streamConfigs) {
-    this.streamConfigs.putAll(streamConfigs);
-    return (SubClass) this;
-  }
-
-  public String getStreamId() {
-    return this.streamId;
-  }
-
-  public String getSystemName() {
-    return this.systemDescriptor.getSystemName();
-  }
-
-  public Serde getSerde() {
-    return this.serde;
-  }
-
-  public SystemDescriptor getSystemDescriptor() {
-    return this.systemDescriptor;
-  }
-
-  public Optional<String> getPhysicalName() {
-    return physicalNameOptional;
-  }
-
-  private boolean isValidStreamId(String id) {
-    return StringUtils.isNotBlank(id) && 
STREAM_ID_PATTERN.matcher(id).matches();
-  }
-
-  public Map<String, String> toConfig() {
-    HashMap<String, String> configs = new HashMap<>();
-    configs.put(String.format(SYSTEM_CONFIG_KEY, streamId), getSystemName());
-    this.physicalNameOptional.ifPresent(physicalName ->
-        configs.put(String.format(PHYSICAL_NAME_CONFIG_KEY, streamId), 
physicalName));
-    this.streamConfigs.forEach((key, value) ->
-        configs.put(String.format(STREAM_CONFIGS_CONFIG_KEY, streamId, key), 
value));
-    return Collections.unmodifiableMap(configs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
deleted file mode 100644
index 05179dd..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * Interface for advanced {@code SystemDescriptor}s that constrain the type of 
returned {@code InputDescriptor}s to
- * their own {@code StreamExpander} function result types.
- *
- * @param <StreamExpanderType> type of the system level {@code StreamExpander} 
results
- */
-public interface ExpandingInputDescriptorProvider<StreamExpanderType> {
-
-  /**
-   * Gets a {@link InputDescriptor} for an input stream on this system. The 
stream has the provided
-   * stream level serde, and the default system level {@code StreamExpander}
-   * <p>
-   * The type of messages in the stream is the type of messages returned by 
the default system level
-   * {@code StreamExpander}
-   *
-   * @param streamId id of the input stream
-   * @param serde stream level serde to be propagated to expanded input streams
-   * @return an {@link InputDescriptor} for the input stream
-   */
-  InputDescriptor<StreamExpanderType, ? extends InputDescriptor> 
getInputDescriptor(String streamId, Serde serde);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
deleted file mode 100644
index c2ceb53..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.serializers.Serde;
-
-
-/**
- * Interface for simple {@code SystemDescriptors} that return {@code 
OutputDescriptors} parameterized by the type of
- * the provided stream level serde.
- */
-public interface OutputDescriptorProvider {
-
-  /**
-   * Gets an {@link OutputDescriptor} representing an output stream on this 
system that uses the provided
-   * stream specific serde instead of the default system serde.
-   * <p>
-   * An {@code OutputStream<KV<K, V>>}, obtained using a descriptor with a 
{@code KVSerde<K, V>}, can send messages
-   * of type {@code KV<K, V>}. An {@code OutputStream<M>} with any other 
{@code Serde<M>} can send messages of
-   * type M without a key.
-   * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
if the {@code SystemProducer}
-   * serializes the outgoing messages itself, and no prior serialization is 
required from the framework.
-   *
-   * @param streamId id of the output stream
-   * @param serde serde for this output stream that overrides the default 
system serde, if any.
-   * @param <StreamMessageType> type of messages in the output stream
-   * @return the {@link OutputDescriptor} for the output stream
-   */
-  <StreamMessageType> OutputDescriptor<StreamMessageType, ? extends 
OutputDescriptor> getOutputDescriptor(String streamId, Serde<StreamMessageType> 
serde);
-}

Reply via email to