SAMZA-1804: System and Stream Descriptors

Design details: 
https://cwiki.apache.org/confluence/display/SAMZA/SEP-14%3A+System+and+Stream+Descriptors

Author: Prateek Maheshwari <pmaheshw...@apache.org>

Reviewers: Yi Pan <nickpa...@gmail.com>, Cameron Lee <ca...@linkedin.com>

Closes #603 from prateekm/stream-descriptor


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2a71baf7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2a71baf7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2a71baf7

Branch: refs/heads/master
Commit: 2a71baf7c05eeacdc0bf4463ca946c23da7995a0
Parents: 2d6b199
Author: Prateek Maheshwari <pmaheshw...@apache.org>
Authored: Mon Aug 27 12:06:27 2018 -0700
Committer: Prateek Maheshwari <pmaheshw...@apache.org>
Committed: Mon Aug 27 12:06:27 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |   5 +-
 .../apache/samza/operators/MessageStream.java   |  12 +-
 .../org/apache/samza/operators/StreamGraph.java | 109 ++----
 .../descriptors/GenericInputDescriptor.java     |  43 +++
 .../descriptors/GenericOutputDescriptor.java    |  43 +++
 .../descriptors/GenericSystemDescriptor.java    |  67 ++++
 .../base/stream/InputDescriptor.java            | 187 +++++++++
 .../base/stream/OutputDescriptor.java           |  44 +++
 .../base/stream/StreamDescriptor.java           | 136 +++++++
 .../ExpandingInputDescriptorProvider.java       |  44 +++
 .../base/system/OutputDescriptorProvider.java   |  48 +++
 .../system/SimpleInputDescriptorProvider.java   |  43 +++
 .../base/system/SystemDescriptor.java           | 177 +++++++++
 .../TransformingInputDescriptorProvider.java    |  44 +++
 .../operators/functions/InputTransformer.java   |  45 +++
 .../operators/functions/StreamExpander.java     |  58 +++
 .../TestExpandingInputDescriptor.java           |  61 +++
 .../descriptors/TestGenericInputDescriptor.java | 123 ++++++
 .../TestGenericSystemDescriptor.java            |  63 ++++
 .../descriptors/TestSimpleInputDescriptor.java  |  65 ++++
 .../TestTransformingInputDescriptor.java        |  66 ++++
 .../ExampleExpandingInputDescriptor.java        |  30 ++
 .../ExampleExpandingOutputDescriptor.java       |  29 ++
 .../ExampleExpandingSystemDescriptor.java       |  49 +++
 .../serde/ExampleSimpleInputDescriptor.java     |  30 ++
 .../serde/ExampleSimpleOutputDescriptor.java    |  29 ++
 .../serde/ExampleSimpleSystemDescriptor.java    |  43 +++
 .../ExampleTransformingInputDescriptor.java     |  30 ++
 .../ExampleTransformingOutputDescriptor.java    |  29 ++
 .../ExampleTransformingSystemDescriptor.java    |  43 +++
 .../samza/execution/JobGraphJsonGenerator.java  |   4 +-
 .../org/apache/samza/execution/JobNode.java     |  20 +-
 .../samza/operators/MessageStreamImpl.java      |   2 +-
 .../apache/samza/operators/StreamGraphSpec.java | 158 +++++---
 .../descriptors/DelegatingSystemDescriptor.java |  70 ++++
 .../samza/operators/impl/InputOperatorImpl.java |  23 +-
 .../samza/operators/spec/InputOperatorSpec.java |  57 ++-
 .../samza/operators/spec/OperatorSpecs.java     |  11 +-
 .../samza/operators/spec/OutputStreamImpl.java  |  12 +
 .../stream/IntermediateMessageStreamImpl.java   |   2 +-
 .../apache/samza/processor/StreamProcessor.java |   2 +-
 .../runtime/AbstractApplicationRunner.java      |  12 +
 .../apache/samza/task/StreamOperatorTask.java   |   3 +-
 .../java/org/apache/samza/util/StreamUtil.java  |   1 +
 .../samza/execution/TestExecutionPlanner.java   |  92 +++--
 .../execution/TestJobGraphJsonGenerator.java    |  38 +-
 .../org/apache/samza/execution/TestJobNode.java |  81 +++-
 .../samza/operators/TestJoinOperator.java       |  50 ++-
 .../samza/operators/TestMessageStreamImpl.java  |   4 +-
 .../samza/operators/TestOperatorSpecGraph.java  |   4 +-
 .../samza/operators/TestStreamGraphSpec.java    | 376 +++++++++----------
 .../operators/impl/TestInputOperatorImpl.java   |  80 ++++
 .../operators/impl/TestOperatorImplGraph.java   |  96 +++--
 .../operators/impl/TestWindowOperator.java      |  24 +-
 .../samza/operators/spec/TestOperatorSpec.java  |  16 +-
 .../spec/TestPartitionByOperatorSpec.java       |  63 +++-
 .../scala/org/apache/samza/util/TestUtil.scala  |   2 -
 .../system/kafka/KafkaInputDescriptor.java      | 108 ++++++
 .../system/kafka/KafkaOutputDescriptor.java     |  39 ++
 .../system/kafka/KafkaSystemDescriptor.java     | 251 +++++++++++++
 .../system/kafka/TestKafkaInputDescriptor.java  |  68 ++++
 .../system/kafka/TestKafkaSystemDescriptor.java |  69 ++++
 .../kv/BaseLocalStoreBackedTableProvider.java   |  15 +-
 .../samza/sql/translator/QueryTranslator.java   |  10 +-
 .../samza/sql/translator/ScanTranslator.java    |  13 +-
 .../samza/sql/translator/TranslatorContext.java |  18 +-
 .../sql/translator/TestJoinTranslator.java      |   2 +-
 .../sql/translator/TestQueryTranslator.java     |   5 +-
 .../example/AppWithGlobalConfigExample.java     |  17 +-
 .../apache/samza/example/BroadcastExample.java  |  24 +-
 .../samza/example/KeyValueStoreExample.java     |  27 +-
 .../org/apache/samza/example/MergeExample.java  |  25 +-
 .../samza/example/OrderShipmentJoinExample.java |  29 +-
 .../samza/example/PageViewCounterExample.java   |  17 +-
 .../samza/example/RepartitionExample.java       |  24 +-
 .../org/apache/samza/example/WindowExample.java |  19 +-
 .../apache/samza/test/framework/TestRunner.java |   2 +-
 .../system/CollectionStreamSystemSpec.java      |  25 +-
 .../TestStandaloneIntegrationApplication.java   |  21 +-
 .../EndOfStreamIntegrationTest.java             |  11 +-
 .../WatermarkIntegrationTest.java               |  12 +-
 .../test/framework/BroadcastAssertApp.java      |  11 +-
 .../StreamApplicationIntegrationTest.java       |  29 +-
 .../samza/test/framework/TestTimerApp.java      |  17 +-
 .../test/operator/RepartitionJoinWindowApp.java |  32 +-
 .../test/operator/RepartitionWindowApp.java     |  19 +-
 .../samza/test/operator/SessionWindowApp.java   |  19 +-
 .../operator/TestRepartitionJoinWindowApp.java  |  12 +-
 .../test/operator/TestRepartitionWindowApp.java |   6 +-
 .../samza/test/operator/TumblingWindowApp.java  |  19 +-
 .../test/processor/TestStreamApplication.java   |  20 +-
 .../processor/TestZkLocalApplicationRunner.java |  60 ++-
 .../apache/samza/test/table/TestLocalTable.java |  28 +-
 .../table/TestLocalTableWithSideInputs.java     |  11 +-
 .../samza/test/table/TestRemoteTable.java       |  17 +-
 .../benchmark/SystemConsumerWithSamzaBench.java |  11 +-
 96 files changed, 3541 insertions(+), 719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e4583e4..7d23717 100644
--- a/build.gradle
+++ b/build.gradle
@@ -139,11 +139,12 @@ project(':samza-api') {
   apply plugin: 'java'
 
   dependencies {
-    compile "org.slf4j:slf4j-api:$slf4jVersion"
+    compile "org.apache.commons:commons-lang3:$commonsLang3Version"
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
+    compile "com.google.guava:guava:$guavaVersion"
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
-    testCompile "com.google.guava:guava:$guavaVersion"
   }
   checkstyle {
     configFile = new File(rootDir, "checkstyle/checkstyle.xml")

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 7797f9a..e3a61c4 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -213,13 +213,11 @@ public interface MessageStream<M> {
 
   /**
    * Re-partitions this {@link MessageStream} using keys from the {@code 
keyExtractor} by creating a new
-   * intermediate stream on the {@code job.default.system}. This intermediate 
stream is both an output and
-   * input to the job.
+   * intermediate stream on the default system provided via {@link 
StreamGraph#setDefaultSystem}. This intermediate
+   * stream is both an output and input to the job.
    * <p>
    * Uses the provided {@link KVSerde} for serialization of keys and values. 
If the provided {@code serde} is null,
-   * uses the default serde provided via {@link StreamGraph#setDefaultSerde}, 
which must be a KVSerde. If the default
-   * serde is not a {@link KVSerde}, a runtime exception will be thrown. If no 
default serde has been provided
-   * <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, 
NoOpSerde>} is used.
+   * uses the key and message serde configured for the job's default system.
    * <p>
    * The number of partitions for this intermediate stream is determined as 
follows:
    * If the stream is an eventual input to a {@link #join}, and the number of 
partitions for the other stream is known,
@@ -251,9 +249,7 @@ public interface MessageStream<M> {
   /**
    * Same as calling {@link #partitionBy(MapFunction, MapFunction, KVSerde, 
String)} with a null KVSerde.
    * <p>
-   * Uses the default serde provided via {@link StreamGraph#setDefaultSerde}, 
which must be a KVSerde. If the default
-   * serde is not a {@link KVSerde}, a runtime exception will be thrown. If no 
default serde has been provided
-   * <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, 
NoOpSerde>} is used.
+   * Uses the key and message serde configured for the job's default system.
    *
    * @param keyExtractor the {@link MapFunction} to extract the message and 
partition key from the input message
    * @param valueExtractor the {@link MapFunction} to extract the value from 
the input message

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java 
b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index 6871bc7..ec6e4b7 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -19,7 +19,9 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.serializers.Serde;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
 import org.apache.samza.table.Table;
 
 
@@ -30,96 +32,65 @@ import org.apache.samza.table.Table;
 public interface StreamGraph {
 
   /**
-   * Sets the default {@link Serde} to use for (de)serializing messages.
-   * <p>.
-   * If the default serde is set, it must be set <b>before</b> creating any 
input or output streams.
+   * Sets the default SystemDescriptor to use for intermediate streams. This 
is equivalent to setting
+   * {@code job.default.system} and its properties in configuration.
    * <p>
-   * If no explicit or default serdes are provided, a {@code 
KVSerde<NoOpSerde, NoOpSerde>} is used. This means that
-   * any streams created without explicit or default serdes should be cast to 
{@code MessageStream<KV<Object, Object>>}.
+   * If the default system descriptor is set, it must be set <b>before</b> 
creating any intermediate streams.
    * <p>
-   * Providing an incompatible message type for the input/output streams that 
use the default serde will result in
+   * If the intermediate stream is created with a stream-level Serde, they 
will be used, else the serde specified
+   * for the {@code job.default.system} in configuration will be used.
+   * <p>
+   * Providing an incompatible message type for the intermediate streams that 
use the default serde will result in
    * {@link ClassCastException}s at runtime.
    *
-   * @param serde the default message {@link Serde} to use
+   * @param defaultSystemDescriptor the default system descriptor to use
    */
-  void setDefaultSerde(Serde<?> serde);
+  void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
 
   /**
-   * Gets the input {@link MessageStream} corresponding to the {@code 
streamId}.
+   * Gets the input {@link MessageStream} corresponding to the {@code 
inputDescriptor}.
    * <p>
-   * An input {@code MessageStream<KV<K, V>}, which can be obtained by calling 
this method with a {@code KVSerde<K, V>},
-   * can receive messages of type {@code KV<K, V>}. An input {@code 
MessageStream<M>} with any other {@code Serde<M>}
-   * can receive messages of type M - the key in the incoming message is 
ignored.
+   * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
+   * can receive messages of type {@code KV<K, V>}. An input {@code 
MessageStream<M>}, obtained using a descriptor with
+   * any other {@code Serde<M>}, can receive messages of type M - the key in 
the incoming message is ignored.
    * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
if the {@code SystemConsumer}
-   * deserializes the incoming messages itself, and no further deserialization 
is required from the framework.
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
+   * {@code SystemConsumer} deserializes the incoming messages itself, and no 
further deserialization is required from
+   * the framework.
    * <p>
-   * Multiple invocations of this method with the same {@code streamId} will 
throw an {@link IllegalStateException}.
+   * Multiple invocations of this method with the same {@code inputDescriptor} 
will throw an
+   * {@link IllegalStateException}.
    *
-   * @param streamId the unique ID for the stream
-   * @param serde the {@link Serde} to use for deserializing incoming messages
+   * @param inputDescriptor the descriptor for the stream
    * @param <M> the type of messages in the input {@link MessageStream}
    * @return the input {@link MessageStream}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@code streamId}
-   */
-  <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde);
-
-  /**
-   * Same as {@link #getInputStream(String, Serde)}, but uses the default 
{@link Serde} provided via
-   * {@link #setDefaultSerde(Serde)} for deserializing input messages.
-   * <p>
-   * If no default serde has been provided <b>before</b> calling this method, 
a {@code KVSerde<NoOpSerde, NoOpSerde>}
-   * is used. Providing a message type {@code M} that is incompatible with the 
default Serde will result in
-   * {@link ClassCastException}s at runtime.
-   * <p>
-   * Multiple invocations of this method with the same {@code streamId} will 
throw an {@link IllegalStateException}.
-   *
-   * @param streamId the unique ID for the stream
-   * @param <M> the type of message in the input {@link MessageStream}
-   * @return the input {@link MessageStream}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@code streamId}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@code inputDescriptor}
    */
-  <M> MessageStream<M> getInputStream(String streamId);
+  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
 
   /**
-   * Gets the {@link OutputStream} corresponding to the {@code streamId}.
+   * Gets the {@link OutputStream} corresponding to the {@code 
outputDescriptor}.
    * <p>
-   * An {@code OutputStream<KV<K, V>>}, which can be obtained by calling this 
method with a {@code KVSerde<K, V>},
-   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>} 
with any other {@code Serde<M>} can
-   * send messages of type M without a key.
+   * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a 
descriptor with a {@code KVSerde<K, V>},
+   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, 
obtained using a descriptor with any
+   * other {@code Serde<M>}, can send messages of type M without a key.
    * <p>
-   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
if the {@code SystemProducer}
-   * serializes the outgoing messages itself, and no prior serialization is 
required from the framework.
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
for the descriptor if the
+   * {@code SystemProducer} serializes the outgoing messages itself, and no 
prior serialization is required from
+   * the framework.
    * <p>
    * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are 
partitioned using their serialized key.
    * When sending messages to any other {@code OutputStream<M>}, messages are 
partitioned using a null partition key.
    * <p>
-   * Multiple invocations of this method with the same {@code streamId} will 
throw an {@link IllegalStateException}.
-   *
-   * @param streamId the unique ID for the stream
-   * @param serde the {@link Serde} to use for serializing outgoing messages
-   * @param <M> the type of messages in the {@link OutputStream}
-   * @return the output {@link MessageStream}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@code streamId}
-   */
-  <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde);
-
-  /**
-   * Same as {@link #getOutputStream(String, Serde)}, but uses the default 
{@link Serde} provided via
-   * {@link #setDefaultSerde(Serde)} for serializing output messages.
-   * <p>
-   * If no default serde has been provided <b>before</b> calling this method, 
a {@code KVSerde<NoOpSerde, NoOpSerde>}
-   * is used. Providing a message type {@code M} that is incompatible with the 
default Serde will result in
-   * {@link ClassCastException}s at runtime.
-   * <p>
-   * Multiple invocations of this method with the same {@code streamId} will 
throw an {@link IllegalStateException}.
+   * Multiple invocations of this method with the same {@code 
outputDescriptor} will throw an
+   * {@link IllegalStateException}.
    *
-   * @param streamId the unique ID for the stream
+   * @param outputDescriptor the descriptor for the stream
    * @param <M> the type of messages in the {@link OutputStream}
-   * @return the output {@link MessageStream}
-   * @throws IllegalStateException when invoked multiple times with the same 
{@code streamId}
+   * @return the {@link OutputStream}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@code outputDescriptor}
    */
-  <M> OutputStream<M> getOutputStream(String streamId);
+  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
 
   /**
    * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
@@ -127,13 +98,13 @@ public interface StreamGraph {
    * Multiple invocations of this method with the same {@link TableDescriptor} 
will throw an
    * {@link IllegalStateException}.
    *
-   * @param tableDesc the {@link TableDescriptor}
+   * @param tableDescriptor the {@link TableDescriptor}
    * @param <K> the type of the key
    * @param <V> the type of the value
-   * @return the {@link Table} corresponding to the {@code tableDesc}
+   * @return the {@link Table} corresponding to the {@code tableDescriptor}
    * @throws IllegalStateException when invoked multiple times with the same 
{@link TableDescriptor}
    */
-  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc);
+  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
 
   /**
    * Sets the {@link ContextManager} for this {@link StreamGraph}.

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
new file mode 100644
index 0000000..a2df29a
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic input stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately 
configured {@link GenericSystemDescriptor}.
+ * <p>
+ * If the system being used provides its own system and stream descriptor 
implementations, they should be used instead.
+ * Otherwise, this {@link GenericInputDescriptor} may be used to provide 
Samza-specific properties of the input stream.
+ * Additional system stream specific properties may be provided using {@link 
#withStreamConfigs}
+ * <p>
+ * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public final class GenericInputDescriptor<StreamMessageType>
+    extends InputDescriptor<StreamMessageType, 
GenericInputDescriptor<StreamMessageType>> {
+  GenericInputDescriptor(String streamId, SystemDescriptor systemDescriptor, 
Serde serde) {
+    super(streamId, serde, systemDescriptor, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
new file mode 100644
index 0000000..b13ac21
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic output stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately 
configured {@link GenericSystemDescriptor}.
+ * <p>
+ * If the system being used provides its own system and stream descriptor 
implementations, they should be used instead.
+ * Otherwise, this {@link GenericOutputDescriptor} may be used to provide 
Samza-specific properties of the output stream.
+ * Additional system stream specific properties may be provided using {@link 
#withStreamConfigs}
+ * <p>
+ * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public final class GenericOutputDescriptor<StreamMessageType>
+    extends OutputDescriptor<StreamMessageType, 
GenericOutputDescriptor<StreamMessageType>> {
+  GenericOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, 
Serde serde) {
+    super(streamId, serde, systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
new file mode 100644
index 0000000..362745f
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+
+import 
org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
+import 
org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic system.
+ * <p>
+ * If the system being used provides its own system and stream descriptor 
implementations, they should be used instead.
+ * Otherwise, this {@link GenericSystemDescriptor} may be used to provide 
Samza-specific properties of the system.
+ * Additional system specific properties may be provided using {@link 
#withSystemConfigs}
+ * <p>
+ * System properties configured using a descriptor override corresponding 
properties provided in configuration.
+ */
+public final class GenericSystemDescriptor extends 
SystemDescriptor<GenericSystemDescriptor>
+    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+
+  /**
+   * Constructs a {@link GenericSystemDescriptor} instance with no system 
level serde.
+   * Serdes must be provided explicitly at stream level when getting input or 
output descriptors.
+   *
+   * @param systemName name of this system
+   * @param factoryClassName name of the SystemFactory class for this system
+   */
+  public GenericSystemDescriptor(String systemName, String factoryClassName) {
+    super(systemName, factoryClassName, null, null);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <StreamMessageType> GenericInputDescriptor<StreamMessageType> 
getInputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new GenericInputDescriptor<>(streamId, this, serde);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> 
getOutputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new GenericOutputDescriptor<>(streamId, this, serde);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
new file mode 100644
index 0000000..bb38ee4
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.base.stream;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamMetadata.OffsetType;
+
+/**
+ * The base descriptor for an input stream. Allows setting properties that are 
common to all input streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class InputDescriptor<StreamMessageType, SubClass extends 
InputDescriptor<StreamMessageType, SubClass>>
+    extends StreamDescriptor<StreamMessageType, SubClass> {
+  private static final String RESET_OFFSET_CONFIG_KEY = 
"streams.%s.samza.reset.offset";
+  private static final String OFFSET_DEFAULT_CONFIG_KEY = 
"streams.%s.samza.offset.default";
+  private static final String PRIORITY_CONFIG_KEY = 
"streams.%s.samza.priority";
+  private static final String BOOTSTRAP_CONFIG_KEY = 
"streams.%s.samza.bootstrap";
+  private static final String BOUNDED_CONFIG_KEY = "streams.%s.samza.bounded";
+  private static final String DELETE_COMMITTED_MESSAGES_CONFIG_KEY = 
"streams.%s.samza.delete.committed.messages";
+
+  private final Optional<InputTransformer> transformerOptional;
+
+  private Optional<Boolean> resetOffsetOptional = Optional.empty();
+  private Optional<OffsetType> offsetDefaultOptional = Optional.empty();
+  private Optional<Integer> priorityOptional = Optional.empty();
+  private Optional<Boolean> isBootstrapOptional = Optional.empty();
+  private Optional<Boolean> isBoundedOptional = Optional.empty();
+  private Optional<Boolean> deleteCommittedMessagesOptional = Optional.empty();
+
+  /**
+   * Constructs an {@link InputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was 
obtained from
+   * @param transformer stream level input stream transform function if 
available, else null
+   */
+  public InputDescriptor(String streamId, Serde serde, SystemDescriptor 
systemDescriptor, InputTransformer transformer) {
+    super(streamId, serde, systemDescriptor);
+
+    // stream level transformer takes precedence over system level transformer
+    if (transformer != null) {
+      this.transformerOptional = Optional.of(transformer);
+    } else {
+      this.transformerOptional = systemDescriptor.getTransformer();
+    }
+  }
+
+  /**
+   * If set to true, when a Samza container starts up, it ignores any 
checkpointed offset for this particular
+   * input stream. Its behavior is thus determined by the {@link 
#withOffsetDefault} setting.
+   * Note that the reset takes effect every time a container is started, which 
may be every time you restart your job,
+   * or more frequently if a container fails and is restarted by the framework.
+   *
+   * @param resetOffset whether the container should ignore any checkpointed 
offset when starting
+   * @return this input descriptor
+   */
+  public SubClass withResetOffset(boolean resetOffset) {
+    this.resetOffsetOptional = Optional.of(resetOffset);
+    return (SubClass) this;
+  }
+
+  /**
+   * If a container starts up without a checkpoint, this property determines 
where in the input stream we should start
+   * consuming. The value must be an OffsetType, one of the following:
+   * <ul>
+   *  <li>upcoming: Start processing messages that are published after the job 
starts.
+   *                Any messages published while the job was not running are 
not processed.
+   *  <li>oldest: Start processing at the oldest available message in the 
system,
+   *              and reprocess the entire available message history.
+   * </ul>
+   * This property is for an individual stream. To set it for all streams 
within a system, see
+   * {@link SystemDescriptor#withDefaultStreamOffsetDefault}. If both are 
defined, the stream-level definition
+   * takes precedence.
+   *
+   * @param offsetDefault offset type to start processing from
+   * @return this input descriptor
+   */
+  public SubClass withOffsetDefault(OffsetType offsetDefault) {
+    this.offsetDefaultOptional = Optional.ofNullable(offsetDefault);
+    return (SubClass) this;
+  }
+
+  /**
+   * If one or more streams have a priority set (any positive integer), they 
will be processed with higher priority
+   * than the other streams.
+   * <p>
+   * You can set several streams to the same priority, or define multiple 
priority levels by assigning a
+   * higher number to the higher-priority streams.
+   * <p>
+   * If a higher-priority stream has any messages available, they will always 
be processed first;
+   * messages from lower-priority streams are only processed when there are no 
new messages on higher-priority inputs.
+   *
+   * @param priority priority for this input stream
+   * @return this input descriptor
+   */
+  public SubClass withPriority(int priority) {
+    this.priorityOptional = Optional.of(priority);
+    return (SubClass) this;
+  }
+
+  /**
+   * If set to true, this stream will be processed as a bootstrap stream. This 
means that every time a Samza container
+   * starts up, this stream will be fully consumed before messages from any 
other stream are processed.
+   *
+   * @param bootstrap whether this stream should be processed as a bootstrap 
stream
+   * @return this input descriptor
+   */
+  public SubClass withBootstrap(boolean bootstrap) {
+    this.isBootstrapOptional = Optional.of(bootstrap);
+    return (SubClass) this;
+  }
+
+  /**
+   * If set to true, this stream will be considered a bounded stream. If all 
input streams in an application are
+   * bounded, the job is considered to be running in batch processing mode.
+   *
+   * @param isBounded whether this stream is a bounded
+   * @return this input descriptor
+   */
+  public SubClass withBounded(boolean isBounded) {
+    this.isBoundedOptional = Optional.of(isBounded);
+    return (SubClass) this;
+  }
+
+  /**
+   * If set to true, and supported by the system implementation, messages 
older than the latest checkpointed offset
+   * for this stream may be deleted after the commit.
+   *
+   * @param deleteCommittedMessages whether the system should attempt to 
delete checkpointed messages
+   * @return this input descriptor
+   */
+  public SubClass withDeleteCommittedMessages(boolean deleteCommittedMessages) 
{
+    this.deleteCommittedMessagesOptional = 
Optional.of(deleteCommittedMessages);
+    return (SubClass) this;
+  }
+
+  public Optional<InputTransformer> getTransformer() {
+    return this.transformerOptional;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>(super.toConfig());
+    String streamId = getStreamId();
+    this.offsetDefaultOptional.ifPresent(od ->
+        configs.put(String.format(OFFSET_DEFAULT_CONFIG_KEY, streamId), 
od.name().toLowerCase()));
+    this.resetOffsetOptional.ifPresent(resetOffset ->
+        configs.put(String.format(RESET_OFFSET_CONFIG_KEY, streamId), 
Boolean.toString(resetOffset)));
+    this.priorityOptional.ifPresent(priority ->
+        configs.put(String.format(PRIORITY_CONFIG_KEY, streamId), 
Integer.toString(priority)));
+    this.isBootstrapOptional.ifPresent(bootstrap ->
+        configs.put(String.format(BOOTSTRAP_CONFIG_KEY, streamId), 
Boolean.toString(bootstrap)));
+    this.isBoundedOptional.ifPresent(bounded ->
+        configs.put(String.format(BOUNDED_CONFIG_KEY, streamId), 
Boolean.toString(bounded)));
+    this.deleteCommittedMessagesOptional.ifPresent(deleteCommittedMessages ->
+        configs.put(String.format(DELETE_COMMITTED_MESSAGES_CONFIG_KEY, 
streamId),
+            Boolean.toString(deleteCommittedMessages)));
+    return Collections.unmodifiableMap(configs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
new file mode 100644
index 0000000..20bbc53
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.base.stream;
+
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * The base descriptor for an output stream. Allows setting properties that 
are common to all output streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class OutputDescriptor<StreamMessageType, SubClass extends 
OutputDescriptor<StreamMessageType, SubClass>>
+    extends StreamDescriptor<StreamMessageType, SubClass> {
+  /**
+   * Constructs an {@link OutputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was 
obtained from
+   */
+  public OutputDescriptor(String streamId, Serde serde, SystemDescriptor 
systemDescriptor) {
+    super(streamId, serde, systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
new file mode 100644
index 0000000..e2f93db
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.base.stream;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * The base descriptor for an input or output stream. Allows setting 
properties that are common to all streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class StreamDescriptor<StreamMessageType, SubClass extends 
StreamDescriptor<StreamMessageType, SubClass>> {
+  private static final String SYSTEM_CONFIG_KEY = "streams.%s.samza.system";
+  private static final String PHYSICAL_NAME_CONFIG_KEY = 
"streams.%s.samza.physical.name";
+  private static final String STREAM_CONFIGS_CONFIG_KEY = "streams.%s.%s";
+  private static final Pattern STREAM_ID_PATTERN = 
Pattern.compile("[\\d\\w-_]+");
+
+  private final String streamId;
+  private final Serde serde;
+  private final SystemDescriptor systemDescriptor;
+
+  private final Map<String, String> streamConfigs = new HashMap<>();
+  private Optional<String> physicalNameOptional = Optional.empty();
+
+  /**
+   * Constructs a {@link StreamDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was 
obtained from
+   */
+  StreamDescriptor(String streamId, Serde serde, SystemDescriptor 
systemDescriptor) {
+    Preconditions.checkArgument(systemDescriptor != null,
+        String.format("SystemDescriptor must not be null. streamId: %s", 
streamId));
+    String systemName = systemDescriptor.getSystemName();
+    Preconditions.checkState(isValidStreamId(streamId),
+        String.format("streamId must be non-empty and must not contain spaces 
or special characters. " +
+            "streamId: %s, systemName: %s", streamId, systemName));
+    Preconditions.checkArgument(serde != null,
+        String.format("Serde must not be null. streamId: %s systemName: %s", 
streamId, systemName));
+    this.streamId = streamId;
+    this.serde = serde;
+    this.systemDescriptor = systemDescriptor;
+  }
+
+  /**
+   * The physical name of the stream on the system on which this stream will 
be accessed.
+   * This is opposed to the {@code streamId} which is the logical name that 
Samza uses to identify the stream.
+   * <p>
+   * A physical name could be a Kafka topic name, an HDFS file URN, or any 
other system-specific identifier.
+   * <p>
+   * If not provided, the logical {@code streamId} is used as the physical 
name.
+   *
+   * @param physicalName physical name for this stream.
+   * @return this stream descriptor.
+   */
+  public SubClass withPhysicalName(String physicalName) {
+    this.physicalNameOptional = Optional.ofNullable(physicalName);
+    return (SubClass) this;
+  }
+
+  /**
+   * Additional system-specific properties for this stream.
+   * <p>
+   * These properties are added under the {@code streams.stream-id.*} scope.
+   *
+   * @param streamConfigs system-specific properties for this stream
+   * @return this stream descriptor
+   */
+  public SubClass withStreamConfigs(Map<String, String> streamConfigs) {
+    this.streamConfigs.putAll(streamConfigs);
+    return (SubClass) this;
+  }
+
+  public String getStreamId() {
+    return this.streamId;
+  }
+
+  public String getSystemName() {
+    return this.systemDescriptor.getSystemName();
+  }
+
+  public Serde getSerde() {
+    return this.serde;
+  }
+
+  public SystemDescriptor getSystemDescriptor() {
+    return this.systemDescriptor;
+  }
+
+  public Optional<String> getPhysicalName() {
+    return physicalNameOptional;
+  }
+
+  private boolean isValidStreamId(String id) {
+    return StringUtils.isNotBlank(id) && 
STREAM_ID_PATTERN.matcher(id).matches();
+  }
+
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(String.format(SYSTEM_CONFIG_KEY, streamId), getSystemName());
+    this.physicalNameOptional.ifPresent(physicalName ->
+        configs.put(String.format(PHYSICAL_NAME_CONFIG_KEY, streamId), 
physicalName));
+    this.streamConfigs.forEach((key, value) ->
+        configs.put(String.format(STREAM_CONFIGS_CONFIG_KEY, streamId, key), 
value));
+    return Collections.unmodifiableMap(configs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
new file mode 100644
index 0000000..05179dd
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.base.system;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * Interface for advanced {@code SystemDescriptor}s that constrain the type of 
returned {@code InputDescriptor}s to
+ * their own {@code StreamExpander} function result types.
+ *
+ * @param <StreamExpanderType> type of the system level {@code StreamExpander} 
results
+ */
+public interface ExpandingInputDescriptorProvider<StreamExpanderType> {
+
+  /**
+   * Gets a {@link InputDescriptor} for an input stream on this system. The 
stream has the provided
+   * stream level serde, and the default system level {@code StreamExpander}
+   * <p>
+   * The type of messages in the stream is the type of messages returned by 
the default system level
+   * {@code StreamExpander}
+   *
+   * @param streamId id of the input stream
+   * @param serde stream level serde to be propagated to expanded input streams
+   * @return an {@link InputDescriptor} for the input stream
+   */
+  InputDescriptor<StreamExpanderType, ? extends InputDescriptor> 
getInputDescriptor(String streamId, Serde serde);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
new file mode 100644
index 0000000..c2ceb53
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.base.system;
+
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.serializers.Serde;
+
+
+/**
+ * Interface for simple {@code SystemDescriptors} that return {@code 
OutputDescriptors} parameterized by the type of
+ * the provided stream level serde.
+ */
+public interface OutputDescriptorProvider {
+
+  /**
+   * Gets an {@link OutputDescriptor} representing an output stream on this 
system that uses the provided
+   * stream specific serde instead of the default system serde.
+   * <p>
+   * An {@code OutputStream<KV<K, V>>}, obtained using a descriptor with a 
{@code KVSerde<K, V>}, can send messages
+   * of type {@code KV<K, V>}. An {@code OutputStream<M>} with any other 
{@code Serde<M>} can send messages of
+   * type M without a key.
+   * <p>
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used 
if the {@code SystemProducer}
+   * serializes the outgoing messages itself, and no prior serialization is 
required from the framework.
+   *
+   * @param streamId id of the output stream
+   * @param serde serde for this output stream that overrides the default 
system serde, if any.
+   * @param <StreamMessageType> type of messages in the output stream
+   * @return the {@link OutputDescriptor} for the output stream
+   */
+  <StreamMessageType> OutputDescriptor<StreamMessageType, ? extends 
OutputDescriptor> getOutputDescriptor(String streamId, Serde<StreamMessageType> 
serde);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
new file mode 100644
index 0000000..de40932
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.base.system;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.serializers.Serde;
+
+
+/**
+ * Interface for simple {@code SystemDescriptors} that return {@code 
InputDescriptors} parameterized by the type of
+ * the provided stream level serde.
+ */
+public interface SimpleInputDescriptorProvider {
+
+  /**
+   * Gets an {@link InputDescriptor} for an input stream on this system. The 
stream has the provided
+   * stream level serde.
+   * <p>
+   * The type of messages in the stream is the type of the provided stream 
level serde.
+   *
+   * @param streamId id of the input stream
+   * @param serde stream level serde for the input stream
+   * @param <StreamMessageType> type of messages in this stream
+   * @return an {@link InputDescriptor} for the input stream
+   */
+  <StreamMessageType> InputDescriptor<StreamMessageType, ? extends 
InputDescriptor> getInputDescriptor(String streamId, Serde<StreamMessageType> 
serde);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
new file mode 100644
index 0000000..4bb121d
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.base.system;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.operators.functions.StreamExpander;
+import org.apache.samza.system.SystemStreamMetadata.OffsetType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The base descriptor for a system. Allows setting properties that are common 
to all systems.
+ * <p>
+ * System properties configured using a descriptor override corresponding 
properties provided in configuration.
+ * <p>
+ * Systems may provide an {@link InputTransformer} to be used for input 
streams on the system. An
+ * {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with 
deserialized key and message
+ * to another message that is delivered to the {@code MessageStream}. It is 
applied at runtime in
+ * {@code InputOperatorImpl}.
+ * <p>
+ * Systems may provide a {@link StreamExpander} to be used for input streams 
on the system. A {@link StreamExpander}
+ * expands the provided {@code InputDescriptor} to a sub-DAG of one or more 
operators on the {@code StreamGraph},
+ * and returns a new {@code MessageStream} with the combined results. It is 
called during graph description
+ * in {@code StreamGraph#getInputStream}.
+ * <p>
+ * Systems that support consuming messages from a stream should provide users 
means of obtaining an
+ * {@code InputDescriptor}. Recommended interfaces for doing so are {@link 
TransformingInputDescriptorProvider} for
+ * systems that support system level {@link InputTransformer}, {@link 
ExpandingInputDescriptorProvider} for systems
+ * that support system level {@link StreamExpander} functions, and {@link 
SimpleInputDescriptorProvider} otherwise.
+ * <p>
+ * Systems that support producing messages to a stream should provide users 
means of obtaining an
+ * {@code OutputDescriptor}. Recommended interface for doing so is {@link 
OutputDescriptorProvider}.
+ * <p>
+ * It is not required for SystemDescriptors to implement one of the Provider 
interfaces above. System implementers
+ * may choose to expose additional or alternate APIs for obtaining 
Input/Output Descriptors by extending
+ * SystemDescriptor directly.
+ *
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class SystemDescriptor<SubClass extends 
SystemDescriptor<SubClass>> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemDescriptor.class);
+  private static final String FACTORY_CONFIG_KEY = "systems.%s.samza.factory";
+  private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = 
"systems.%s.default.stream.samza.offset.default";
+  private static final String DEFAULT_STREAM_CONFIGS_CONFIG_KEY = 
"systems.%s.default.stream.%s";
+  private static final String SYSTEM_CONFIGS_CONFIG_KEY = "systems.%s.%s";
+  private static final Pattern SYSTEM_NAME_PATTERN = 
Pattern.compile("[\\d\\w-_]+");
+
+  private final String systemName;
+  private final Optional<String> factoryClassNameOptional;
+  private final Optional<InputTransformer> transformerOptional;
+  private final Optional<StreamExpander> expanderOptional;
+
+  private final Map<String, String> systemConfigs = new HashMap<>();
+  private final Map<String, String> defaultStreamConfigs = new HashMap<>();
+  private Optional<OffsetType> defaultStreamOffsetDefaultOptional = 
Optional.empty();
+
+  /**
+   * Constructs a {@link SystemDescriptor} instance.
+   *
+   * @param systemName name of this system
+   * @param factoryClassName name of the SystemFactory class for this system
+   * @param transformer the {@link InputTransformer} for the system if any, 
else null
+   * @param expander the {@link StreamExpander} for the system if any, else 
null
+   */
+  public SystemDescriptor(String systemName, String factoryClassName, 
InputTransformer transformer, StreamExpander expander) {
+    Preconditions.checkArgument(isValidSystemName(systemName),
+        String.format("systemName: %s must be non-empty and must not contain 
spaces or special characters.", systemName));
+    if (StringUtils.isBlank(factoryClassName)) {
+      LOGGER.warn("Blank SystemFactory class name for system: {}. A value must 
be provided in configuration using {}.",
+          systemName, String.format(FACTORY_CONFIG_KEY, systemName));
+    }
+    this.systemName = systemName;
+    this.factoryClassNameOptional = 
Optional.ofNullable(StringUtils.stripToNull(factoryClassName));
+    this.transformerOptional = Optional.ofNullable(transformer);
+    this.expanderOptional = Optional.ofNullable(expander);
+  }
+
+  /**
+   * If a container starts up without a checkpoint, this property determines 
where in the input stream we should start
+   * consuming. The value must be an {@link OffsetType}, one of the following:
+   * <ul>
+   *  <li>upcoming: Start processing messages that are published after the job 
starts.
+   *                Any messages published while the job was not running are 
not processed.
+   *  <li>oldest: Start processing at the oldest available message in the 
system,
+   *              and reprocess the entire available message history.
+   * </ul>
+   * This property is for all streams obtained using this system descriptor. 
To set it for an individual stream,
+   * see {@link 
org.apache.samza.operators.descriptors.base.stream.InputDescriptor#withOffsetDefault}.
+   * If both are defined, the stream-level definition takes precedence.
+   *
+   * @param offsetType offset type to start processing from
+   * @return this system descriptor
+   */
+  public SubClass withDefaultStreamOffsetDefault(OffsetType offsetType) {
+    this.defaultStreamOffsetDefaultOptional = Optional.ofNullable(offsetType);
+    return (SubClass) this;
+  }
+
+  /**
+   * Additional system-specific properties for this system.
+   * <p>
+   * These properties are added under the {@code systems.system-name.*} scope.
+   *
+   * @param systemConfigs system-specific properties for this system
+   * @return this system descriptor
+   */
+  public SubClass withSystemConfigs(Map<String, String> systemConfigs) {
+    this.systemConfigs.putAll(systemConfigs);
+    return (SubClass) this;
+  }
+
+  /**
+   * Default properties for any stream obtained using this system descriptor.
+   * <p>
+   * For example, if 
"systems.kafka-system.default.stream.replication.factor"=2 was configured,
+   * then every Kafka stream created on the kafka-system will have a 
replication factor of 2
+   * unless the property is explicitly overridden using the stream descriptor.
+   *
+   * @param defaultStreamConfigs default stream properties
+   * @return this system descriptor
+   */
+  public SubClass withDefaultStreamConfigs(Map<String, String> 
defaultStreamConfigs) {
+    this.defaultStreamConfigs.putAll(defaultStreamConfigs);
+    return (SubClass) this;
+  }
+
+  public String getSystemName() {
+    return this.systemName;
+  }
+
+  public Optional<InputTransformer> getTransformer() {
+    return this.transformerOptional;
+  }
+
+  public Optional<StreamExpander> getExpander() {
+    return this.expanderOptional;
+  }
+
+  private boolean isValidSystemName(String id) {
+    return StringUtils.isNotBlank(id) && 
SYSTEM_NAME_PATTERN.matcher(id).matches();
+  }
+
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>();
+    this.factoryClassNameOptional.ifPresent(name -> 
configs.put(String.format(FACTORY_CONFIG_KEY, systemName), name));
+    this.defaultStreamOffsetDefaultOptional.ifPresent(dsod ->
+        configs.put(String.format(DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, 
systemName), dsod.name().toLowerCase()));
+    this.defaultStreamConfigs.forEach((key, value) ->
+        configs.put(String.format(DEFAULT_STREAM_CONFIGS_CONFIG_KEY, 
getSystemName(), key), value));
+    this.systemConfigs.forEach((key, value) ->
+        configs.put(String.format(SYSTEM_CONFIGS_CONFIG_KEY, getSystemName(), 
key), value));
+    return configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
new file mode 100644
index 0000000..5b43fbd
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.base.system;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * Interface for advanced {@code SystemDescriptor}s that constrain the type of 
returned {@code InputDescriptor}s to
+ * their own {@code InputTransformer} function result types.
+ *
+ * @param <InputTransformerType> type of the system level {@code 
InputTransformer} results
+ */
+public interface TransformingInputDescriptorProvider<InputTransformerType> {
+
+  /**
+   * Gets a {@link InputDescriptor} for an input stream on this system. The 
stream has the provided
+   * stream level serde, and the default system level {@code InputTransformer}.
+   * <p>
+   * The type of messages in the stream is the type of messages returned by 
the default system level
+   * {@code InputTransformer}
+   *
+   * @param streamId id of the input stream
+   * @param serde stream level serde for the input stream
+   * @return an {@link InputDescriptor} for the input stream
+   */
+  InputDescriptor<InputTransformerType, ? extends InputDescriptor> 
getInputDescriptor(String streamId, Serde serde);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
new file mode 100644
index 0000000..704d187
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import java.io.Serializable;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+/**
+ * Transforms an {@link IncomingMessageEnvelope} with deserialized key and 
message to a message of type {@code OM}
+ * which is delivered to the {@code MessageStream}. Called in {@code 
InputOperatorImpl} when incoming messages
+ * from a {@code SystemConsumer} are being delivered to the application.
+ * <p>
+ * This is provided by default by transforming system descriptor 
implementations and can not be overridden
+ * or set on a per stream level.
+ *
+ * @param <OM> type of the transformed message
+ */
+public interface InputTransformer<OM> extends InitableFunction, 
ClosableFunction, Serializable {
+
+  /**
+   * Transforms the provided {@link IncomingMessageEnvelope} with deserialized 
key and message into another message
+   * which is delivered to the {@code MessageStream}.
+   *
+   * @param ime  the {@link IncomingMessageEnvelope} to be transformed
+   * @return  the transformed message
+   */
+  OM apply(IncomingMessageEnvelope ime);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
new file mode 100644
index 0000000..085a98d
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import java.io.Serializable;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+
+/**
+ * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more 
operators on the {@link StreamGraph},
+ * and returns a new {@link MessageStream} with the combined results. Called 
when {@link StreamGraph#getInputStream}
+ * is being used to get a {@link MessageStream} using an {@link 
InputDescriptor} from an expanding system descriptor.
+ * <p>
+ * This is provided by default by expanding system descriptor implementations 
and can not be overridden
+ * or set on a per stream level.
+ *
+ * @param <OM> type of the messages in the resultant {@link MessageStream}
+ */
+public interface StreamExpander<OM> extends Serializable {
+
+  /**
+   * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more 
operators on the {@link StreamGraph},
+   * and returns a new {@link MessageStream} with the combined results. Called 
when the {@link InputDescriptor}
+   * is being used to get an {@link MessageStream} using {@link 
StreamGraph#getInputStream}.
+   * <p>
+   * Notes for system implementers:
+   * <p>
+   * Take care to avoid infinite recursion in the implementation; e.g., by 
ensuring that it doesn't call
+   * {@link StreamGraph#getInputStream} with an {@link InputDescriptor} from 
an expanding system descriptor
+   * (like this one) again.
+   * <p>
+   * It's the {@link StreamExpander}'s responsibility to propagate any 
properties, including serde, from the
+   * user-provided {@link InputDescriptor} to the expanded input descriptors.
+   *
+   * @param streamGraph the {@link StreamGraph} to register the expanded 
sub-DAG of operators on
+   * @param inputDescriptor the {@link InputDescriptor} to be expanded
+   * @return the {@link MessageStream} containing the combined results of the 
sub-DAG of operators
+   */
+  MessageStream<OM> apply(StreamGraph streamGraph, InputDescriptor 
inputDescriptor);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java
new file mode 100644
index 0000000..e635338
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+import java.util.Collections;
+import 
org.apache.samza.operators.descriptors.expanding.ExampleExpandingInputDescriptor;
+import 
org.apache.samza.operators.descriptors.expanding.ExampleExpandingOutputDescriptor;
+import 
org.apache.samza.operators.descriptors.expanding.ExampleExpandingSystemDescriptor;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExpandingInputDescriptor {
+  public void testAPIUsage() {
+    // does not assert anything, but acts as a compile-time check on expected 
descriptor type parameters
+    // and validates that the method calls can be chained.
+    ExampleExpandingSystemDescriptor expandingSystem = new 
ExampleExpandingSystemDescriptor("expandingSystem");
+    ExampleExpandingInputDescriptor<Long> input1 = 
expandingSystem.getInputDescriptor("input1", new IntegerSerde());
+    ExampleExpandingOutputDescriptor<Integer> output1 = 
expandingSystem.getOutputDescriptor("output1", new IntegerSerde());
+
+    input1
+        .withBootstrap(false)
+        .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST)
+        .withPhysicalName("input-1")
+        .withPriority(1)
+        .withResetOffset(false)
+        .withStreamConfigs(Collections.emptyMap());
+
+    output1
+        .withPhysicalName("output-1")
+        .withStreamConfigs(Collections.emptyMap());
+  }
+
+  @Test
+  public void testISDObjectsWithOverrides() {
+    ExampleExpandingSystemDescriptor expandingSystem = new 
ExampleExpandingSystemDescriptor("expandingSystem");
+    IntegerSerde streamSerde = new IntegerSerde();
+    ExampleExpandingInputDescriptor<Long> expandingISD = 
expandingSystem.getInputDescriptor("input-stream", streamSerde);
+
+    assertEquals(streamSerde, expandingISD.getSerde());
+    assertEquals(expandingSystem.getTransformer().get(), 
expandingISD.getTransformer().get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java
new file mode 100644
index 0000000..9b39d41
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.serializers.DoubleSerde;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestGenericInputDescriptor {
+  @Test
+  public void testAPIUsage() {
+    // does not assert anything, but acts as a compile-time check on expected 
descriptor type parameters
+    // and validates that the method calls can be chained.
+    GenericSystemDescriptor mySystem =
+        new GenericSystemDescriptor("input-system", "factory.class.name")
+            .withSystemConfigs(Collections.emptyMap())
+            .withDefaultStreamConfigs(Collections.emptyMap());
+    GenericInputDescriptor<Integer> input1 = 
mySystem.getInputDescriptor("input1", new IntegerSerde());
+    GenericOutputDescriptor<Integer> output1 = 
mySystem.getOutputDescriptor("output1", new IntegerSerde());
+
+    input1
+        .withPhysicalName("input-1")
+        .withBootstrap(false)
+        .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST)
+        .withPriority(1)
+        .withResetOffset(false)
+        .withBounded(false)
+        .withDeleteCommittedMessages(true)
+        .withStreamConfigs(Collections.emptyMap());
+
+    output1
+        .withPhysicalName("output-1")
+        .withStreamConfigs(Collections.emptyMap());
+  }
+
+
+  @Test
+  public void testISDConfigsWithOverrides() {
+    GenericSystemDescriptor mySystem =
+        new GenericSystemDescriptor("input-system", "factory.class.name")
+            .withSystemConfigs(Collections.emptyMap())
+            .withDefaultStreamConfigs(Collections.emptyMap());
+
+    GenericInputDescriptor<Double> isd = 
mySystem.getInputDescriptor("input-stream", new DoubleSerde())
+            .withPhysicalName("physical-name")
+            .withBootstrap(true)
+            .withBounded(true)
+            .withDeleteCommittedMessages(true)
+            .withOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST)
+            .withPriority(12)
+            .withResetOffset(true)
+            .withStreamConfigs(ImmutableMap.of("custom-config-key", 
"custom-config-value"));
+
+    Map<String, String> generatedConfigs = isd.toConfig();
+    Map<String, String> expectedConfigs = new HashMap<>();
+    expectedConfigs.put("streams.input-stream.samza.system", "input-system");
+    expectedConfigs.put("streams.input-stream.samza.physical.name", 
"physical-name");
+    expectedConfigs.put("streams.input-stream.samza.bootstrap", "true");
+    expectedConfigs.put("streams.input-stream.samza.bounded", "true");
+    
expectedConfigs.put("streams.input-stream.samza.delete.committed.messages", 
"true");
+    expectedConfigs.put("streams.input-stream.samza.reset.offset", "true");
+    expectedConfigs.put("streams.input-stream.samza.offset.default", "oldest");
+    expectedConfigs.put("streams.input-stream.samza.priority", "12");
+    expectedConfigs.put("streams.input-stream.custom-config-key", 
"custom-config-value");
+
+    assertEquals(expectedConfigs, generatedConfigs);
+  }
+
+  @Test
+  public void testISDConfigsWithDefaults() {
+    GenericSystemDescriptor mySystem =
+        new GenericSystemDescriptor("input-system", "factory.class.name")
+            .withSystemConfigs(Collections.emptyMap())
+            .withDefaultStreamConfigs(Collections.emptyMap());
+
+    DoubleSerde streamSerde = new DoubleSerde();
+    GenericInputDescriptor<Double> isd = 
mySystem.getInputDescriptor("input-stream", streamSerde);
+
+    Map<String, String> generatedConfigs = isd.toConfig();
+    Map<String, String> expectedConfigs = 
ImmutableMap.of("streams.input-stream.samza.system", "input-system");
+    assertEquals(expectedConfigs, generatedConfigs);
+    assertEquals(streamSerde, isd.getSerde());
+    assertFalse(isd.getTransformer().isPresent());
+  }
+
+  @Test
+  public void testISDObjectsWithOverrides() {
+    GenericSystemDescriptor mySystem =
+        new GenericSystemDescriptor("input-system", "factory.class.name")
+            .withSystemConfigs(Collections.emptyMap())
+            .withDefaultStreamConfigs(Collections.emptyMap());
+    IntegerSerde streamSerde = new IntegerSerde();
+    GenericInputDescriptor<Integer> isd = 
mySystem.getInputDescriptor("input-stream", streamSerde);
+
+    assertEquals(streamSerde, isd.getSerde());
+    assertFalse(isd.getTransformer().isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericSystemDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericSystemDescriptor.java
new file mode 100644
index 0000000..937b56a
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericSystemDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestGenericSystemDescriptor {
+  @Test
+  public void testSDConfigs() {
+    GenericSystemDescriptor mySystem =
+        new GenericSystemDescriptor("input-system", "factory.class.name")
+            .withSystemConfigs(ImmutableMap.of("custom-config-key", 
"custom-config-value"))
+            
.withDefaultStreamConfigs(ImmutableMap.of("custom-stream-config-key", 
"custom-stream-config-value"))
+            
.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.UPCOMING);
+
+    Map<String, String> generatedConfigs = mySystem.toConfig();
+    Map<String, String> expectedConfigs = ImmutableMap.of(
+        "systems.input-system.samza.factory", "factory.class.name",
+        "systems.input-system.custom-config-key", "custom-config-value",
+        "systems.input-system.default.stream.custom-stream-config-key", 
"custom-stream-config-value",
+        "systems.input-system.default.stream.samza.offset.default", "upcoming"
+    );
+    assertEquals(expectedConfigs, generatedConfigs);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetInputDescriptorWithNullSerde() {
+    GenericSystemDescriptor mySystem = new 
GenericSystemDescriptor("input-system", "factory.class.name");
+    mySystem.getInputDescriptor("streamId", null); // should throw an exception
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetSystemDescriptorWithNullSystemName() {
+    new GenericSystemDescriptor(null, "factory.class.name");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetSystemDescriptorWithEmptySystemName() {
+    new GenericSystemDescriptor(" ", "factory.class.name");
+  }
+}

Reply via email to