http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaInputDescriptor.java
 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaInputDescriptor.java
new file mode 100644
index 0000000..d3fe252
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaInputDescriptor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a kafka input stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately 
configured {@link KafkaSystemDescriptor}.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public class KafkaInputDescriptor<StreamMessageType>
+    extends InputDescriptor<StreamMessageType, 
KafkaInputDescriptor<StreamMessageType>> {
+  private static final String CONSUMER_AUTO_OFFSET_RESET_CONFIG_KEY = 
"systems.%s.streams.%s.consumer.auto.offset.reset";
+  private static final String CONSUMER_FETCH_MESSAGE_MAX_BYTES_CONFIG_KEY = 
"systems.%s.streams.%s.consumer.fetch.message.max.bytes";
+
+  private Optional<String> consumerAutoOffsetResetOptional = Optional.empty();
+  private Optional<Long> consumerFetchMessageMaxBytesOptional = 
Optional.empty();
+
+  KafkaInputDescriptor(String streamId, SystemDescriptor systemDescriptor, 
Serde serde, InputTransformer transformer) {
+    super(streamId, serde, systemDescriptor, transformer);
+  }
+
+  /**
+   * This setting determines what happens if a consumer attempts to read an 
offset for this topic that is outside of
+   * the current valid range. This could happen if the topic does not exist, 
or if a checkpoint is older than the
+   * maximum message history retained by the brokers. This property is not to 
be confused with
+   * {@link InputDescriptor#withOffsetDefault}, which determines what happens 
if there is no checkpoint.
+   * <p>
+   * The following are valid values for auto.offset.reset:
+   * <ul>
+   *   <li>smallest: Start consuming at the smallest (oldest) offset available 
on the broker
+   *   (process as much message history as available).
+   *   <li>largest: Start consuming at the largest (newest) offset available 
on the broker
+   *   (skip any messages published while the job was not running).
+   *   <li>anything else: Throw an exception and refuse to start up the job.
+   * </ul>
+   * <p>
+   * Note: This property may be set at a system level using {@link 
KafkaSystemDescriptor#withConsumerAutoOffsetReset}
+   *
+   * @param consumerAutoOffsetReset consumer auto offset reset policy for the 
input
+   * @return this input descriptor
+   */
+  public KafkaInputDescriptor<StreamMessageType> 
withConsumerAutoOffsetReset(String consumerAutoOffsetReset) {
+    this.consumerAutoOffsetResetOptional = 
Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset));
+    return this;
+  }
+
+  /**
+   * The number of bytes of messages to attempt to fetch for each 
topic-partition for this topic in each fetch request.
+   * These bytes will be read into memory for each partition, so this helps 
control the memory used by the consumer.
+   * The fetch request size must be at least as large as the maximum message 
size the server allows or else it is
+   * possible for the producer to send messages larger than the consumer can 
fetch.
+   * <p>
+   * Note: This property may be set at a system level using {@link 
KafkaSystemDescriptor#withConsumerFetchMessageMaxBytes}
+   *
+   * @param fetchMessageMaxBytes number of bytes of messages to attempt to 
fetch for each topic-partition
+   *                             in each fetch request
+   * @return this input descriptor
+   */
+  public KafkaInputDescriptor<StreamMessageType> 
withConsumerFetchMessageMaxBytes(long fetchMessageMaxBytes) {
+    this.consumerFetchMessageMaxBytesOptional = 
Optional.of(fetchMessageMaxBytes);
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>(super.toConfig());
+    // Note: Kafka configuration needs the topic's physical name, not the 
stream-id.
+    // We won't have that here if user only specified it in configs, or if it 
got rewritten
+    // by the planner to something different than what's in this descriptor.
+    String streamName = getPhysicalName().orElse(getStreamId());
+    String systemName = getSystemName();
+
+    consumerAutoOffsetResetOptional.ifPresent(autoOffsetReset ->
+        configs.put(String.format(CONSUMER_AUTO_OFFSET_RESET_CONFIG_KEY, 
systemName, streamName), autoOffsetReset));
+    consumerFetchMessageMaxBytesOptional.ifPresent(fetchMessageMaxBytes ->
+        configs.put(String.format(CONSUMER_FETCH_MESSAGE_MAX_BYTES_CONFIG_KEY, 
systemName, streamName), Long.toString(fetchMessageMaxBytes)));
+    return configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaOutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaOutputDescriptor.java
 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaOutputDescriptor.java
new file mode 100644
index 0000000..1142276
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaOutputDescriptor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka;
+
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a kafka output stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately 
configured {@link KafkaSystemDescriptor}.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding 
properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public class KafkaOutputDescriptor<StreamMessageType>
+    extends OutputDescriptor<StreamMessageType, 
KafkaOutputDescriptor<StreamMessageType>> {
+  KafkaOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, 
Serde<StreamMessageType> serde) {
+    super(streamId, serde, systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
new file mode 100644
index 0000000..6fa8915
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import 
org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
+import 
org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+
+/**
+ * A descriptor for a Kafka system.
+ * <p>
+ * System properties configured using a descriptor override corresponding 
properties provided in configuration.
+ */
+@SuppressWarnings("unchecked")
+public class KafkaSystemDescriptor extends 
SystemDescriptor<KafkaSystemDescriptor>
+    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+  private static final String FACTORY_CLASS_NAME = 
KafkaSystemFactory.class.getName();
+  private static final String CONSUMER_ZK_CONNECT_CONFIG_KEY = 
"systems.%s.consumer.zookeeper.connect";
+  private static final String CONSUMER_AUTO_OFFSET_RESET_CONFIG_KEY = 
"systems.%s.consumer.auto.offset.reset";
+  private static final String CONSUMER_FETCH_THRESHOLD_CONFIG_KEY = 
KafkaConfig.CONSUMER_FETCH_THRESHOLD();
+  private static final String CONSUMER_FETCH_THRESHOLD_BYTES_CONFIG_KEY = 
KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES();
+  private static final String CONSUMER_FETCH_MESSAGE_MAX_BYTES_KEY = 
"systems.%s.consumer.fetch.message.max.bytes";
+  private static final String CONSUMER_CONFIGS_CONFIG_KEY = 
"systems.%s.consumer.%s";
+  private static final String PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY = 
"systems.%s.producer.bootstrap.servers";
+  private static final String PRODUCER_CONFIGS_CONFIG_KEY = 
"systems.%s.producer.%s";
+
+  private List<String> consumerZkConnect = Collections.emptyList();
+  private Optional<String> consumerAutoOffsetResetOptional = Optional.empty();
+  private Optional<Integer> consumerFetchThresholdOptional = Optional.empty();
+  private Optional<Long> consumerFetchThresholdBytesOptional = 
Optional.empty();
+  private Optional<Long> consumerFetchMessageMaxBytesOptional = 
Optional.empty();
+  private Map<String, String> consumerConfigs = Collections.emptyMap();
+  private List<String> producerBootstrapServers = Collections.emptyList();
+  private Map<String, String> producerConfigs = Collections.emptyMap();
+
+  /**
+   * Constructs a {@link KafkaSystemDescriptor} instance with no system level 
serde.
+   * Serdes must be provided explicitly at stream level when getting input or 
output descriptors.
+   *
+   * @param systemName name of this system
+   */
+  public KafkaSystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, null, null);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <StreamMessageType> KafkaInputDescriptor<StreamMessageType> 
getInputDescriptor(String streamId, Serde<StreamMessageType> serde) {
+    return new KafkaInputDescriptor<>(streamId, this, serde, null);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <StreamMessageType> KafkaOutputDescriptor<StreamMessageType> 
getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) {
+    return new KafkaOutputDescriptor<>(streamId, this, serde);
+  }
+
+  /**
+   * The hostname and port of one or more Zookeeper nodes where information 
about the Kafka cluster can be found.
+   * This is given as a list of hostname:port pairs, such as
+   * {@code ImmutableList.of("zk1.example.com:2181", "zk2.example.com:2181", 
"zk3.example.com:2181")}.
+   * If the cluster information is at some sub-path of the Zookeeper 
namespace, you need to include the path at the
+   * end of the list of hostnames, for example:
+   * {@code ImmutableList.of("zk1.example.com:2181", 
"zk2.example.com:2181/clusters/my-kafka")}.
+   *
+   * @param consumerZkConnect Zookeeper connection information for the system
+   * @return this system descriptor
+   */
+  public KafkaSystemDescriptor withConsumerZkConnect(List<String> 
consumerZkConnect) {
+    this.consumerZkConnect = consumerZkConnect;
+    return this;
+  }
+
+  /**
+   * This setting determines what happens if a consumer attempts to read an 
offset that is outside of the current
+   * valid range. This could happen if the topic does not exist, or if a 
checkpoint is older than the maximum message
+   * history retained by the brokers. This property is not to be confused with 
{@link InputDescriptor#withOffsetDefault},
+   * which determines what happens if there is no checkpoint.
+   * <p>
+   * The following are valid values for auto.offset.reset:
+   * <ul>
+   *   <li>smallest: Start consuming at the smallest (oldest) offset available 
on the broker
+   *   (process as much message history as available).
+   *   <li>largest: Start consuming at the largest (newest) offset available 
on the broker
+   *   (skip any messages published while the job was not running).
+   *   <li>anything else: Throw an exception and refuse to start up the job.
+   * </ul>
+   * <p>
+   * Note: This property may be set at a topic level using {@link 
KafkaInputDescriptor#withConsumerAutoOffsetReset}
+   *
+   * @param consumerAutoOffsetReset consumer auto offset reset policy for the 
system
+   * @return this system descriptor
+   */
+  public KafkaSystemDescriptor withConsumerAutoOffsetReset(String 
consumerAutoOffsetReset) {
+    this.consumerAutoOffsetResetOptional = 
Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset));
+    return this;
+  }
+
+  /**
+   * When consuming streams from Kafka, a Samza container maintains an 
in-memory buffer for incoming messages in
+   * order to increase throughput (the stream task can continue processing 
buffered messages while new messages are
+   * fetched from Kafka). This parameter determines the number of messages we 
aim to buffer across all stream partitions
+   * consumed by a container. For example, if a container consumes 50 
partitions, it will try to buffer 1000
+   * messages per partition by default. When the number of buffered messages 
falls below that threshold, Samza
+   * fetches more messages from the Kafka broker to replenish the buffer. 
Increasing this parameter can increase
+   * a job's processing throughput, but also increases the amount of memory 
used.
+   *
+   * @param fetchThreshold number of incoming messages to buffer in-memory
+   * @return this system descriptor
+   */
+  public KafkaSystemDescriptor withSamzaFetchThreshold(int fetchThreshold) {
+    this.consumerFetchThresholdOptional = Optional.of(fetchThreshold);
+    return this;
+  }
+
+  /**
+   * When consuming streams from Kafka, a Samza container maintains an 
in-memory buffer for incoming messages in
+   * order to increase throughput (the stream task can continue processing 
buffered messages while new messages are
+   * fetched from Kafka). This parameter determines the total size of messages 
we aim to buffer across all stream
+   * partitions consumed by a container based on bytes. Defines how many bytes 
to use for the buffered prefetch
+   * messages for job as a whole. The bytes for a single 
system/stream/partition are computed based on this.
+   * This fetches the entire messages, hence this bytes limit is a soft one, 
and the actual usage can be
+   * the bytes limit + size of max message in the partition for a given 
stream. If the value of this property
+   * is &gt; 0 then this takes precedence over 
systems.system-name.samza.fetch.threshold.
+   * <p>
+   * For example, if fetchThresholdBytes is set to 100000 bytes, and there are 
50 SystemStreamPartitions registered,
+   * then the per-partition threshold is (100000 / 2) / 50 = 1000 bytes. As 
this is a soft limit, the actual usage
+   * can be 1000 bytes + size of max message. As soon as a 
SystemStreamPartition's buffered messages bytes drops
+   * below 1000, a fetch request will be executed to get more data for it. 
Increasing this parameter will decrease
+   * the latency between when a queue is drained of messages and when new 
messages are enqueued, but also leads
+   * to an increase in memory usage since more messages will be held in 
memory. The default value is -1,
+   * which means this is not used.
+   *
+   * @param fetchThresholdBytes number of bytes for incoming messages to 
buffer in-memory
+   * @return this system descriptor
+   */
+  public KafkaSystemDescriptor withSamzaFetchThresholdBytes(long 
fetchThresholdBytes) {
+    this.consumerFetchThresholdBytesOptional = 
Optional.of(fetchThresholdBytes);
+    return this;
+  }
+
+  /**
+   * The number of bytes of messages to attempt to fetch for each 
topic-partition in each fetch request.
+   * These bytes will be read into memory for each partition, so this helps 
control the memory used by the consumer.
+   * The fetch request size must be at least as large as the maximum message 
size the server allows or else it is
+   * possible for the producer to send messages larger than the consumer can 
fetch.
+   * <p>
+   * Note: This property may be set at a topic level using {@link 
KafkaInputDescriptor#withConsumerFetchMessageMaxBytes}
+   *
+   * @param fetchMessageMaxBytes number of bytes of messages to attempt to 
fetch for each topic-partition
+   *                             in each fetch request
+   * @return this system descriptor
+   */
+  public KafkaSystemDescriptor withConsumerFetchMessageMaxBytes(long 
fetchMessageMaxBytes) {
+    this.consumerFetchMessageMaxBytesOptional = 
Optional.of(fetchMessageMaxBytes);
+    return this;
+  }
+
+  /**
+   * Any Kafka consumer configuration can be included here. For example, to 
change the socket timeout,
+   * you can set socket.timeout.ms. (There is no need to configure group.id or 
client.id, as they are automatically
+   * configured by Samza. Also, there is no need to set auto.commit.enable 
because Samza has its own
+   * checkpointing mechanism.)
+   *
+   * @param consumerConfigs additional consumer configuration
+   * @return this system descriptor
+   */
+  public KafkaSystemDescriptor withConsumerConfigs(Map<String, String> 
consumerConfigs) {
+    this.consumerConfigs = consumerConfigs;
+    return this;
+  }
+
+  /**
+   * A list of network endpoints where the Kafka brokers are running. This is 
given as a list of hostname:port pairs,
+   * for example {@code ImmutableList.of("kafka1.example.com:9092", 
"kafka2.example.com:9092", "kafka3.example.com:9092")}.
+   * It's not necessary to list every single Kafka node in the cluster: Samza 
uses this property in order to discover
+   * which topics and partitions are hosted on which broker. This property is 
needed even if you are only consuming
+   * from Kafka, and not writing to it, because Samza uses it to discover 
metadata about streams being consumed.
+   *
+   * @param producerBootstrapServers network endpoints where the kafka brokers 
are running
+   * @return this system descriptor
+   */
+  public KafkaSystemDescriptor withProducerBootstrapServers(List<String> 
producerBootstrapServers) {
+    this.producerBootstrapServers = producerBootstrapServers;
+    return this;
+  }
+
+  /**
+   * Any Kafka producer configuration can be included here. For example, to 
change the request timeout,
+   * you can set timeout.ms. (There is no need to configure client.id as it is 
automatically configured by Samza.)
+   *
+   * @param producerConfigs additional producer configuration
+   * @return this system descriptor
+   */
+  public KafkaSystemDescriptor withProducerConfigs(Map<String, String> 
producerConfigs) {
+    this.producerConfigs = producerConfigs;
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    Map<String, String> configs = new HashMap<>(super.toConfig());
+    if(!consumerZkConnect.isEmpty()) {
+      configs.put(String.format(CONSUMER_ZK_CONNECT_CONFIG_KEY, 
getSystemName()), String.join(",", consumerZkConnect));
+    }
+    consumerAutoOffsetResetOptional.ifPresent(consumerAutoOffsetReset ->
+        configs.put(String.format(CONSUMER_AUTO_OFFSET_RESET_CONFIG_KEY, 
getSystemName()), consumerAutoOffsetReset));
+    consumerFetchThresholdOptional.ifPresent(consumerFetchThreshold ->
+        configs.put(String.format(CONSUMER_FETCH_THRESHOLD_CONFIG_KEY, 
getSystemName()), Integer.toString(consumerFetchThreshold)));
+    consumerFetchThresholdBytesOptional.ifPresent(consumerFetchThresholdBytes 
->
+        configs.put(String.format(CONSUMER_FETCH_THRESHOLD_BYTES_CONFIG_KEY, 
getSystemName()), Long.toString(consumerFetchThresholdBytes)));
+    
consumerFetchMessageMaxBytesOptional.ifPresent(consumerFetchMessageMaxBytes ->
+        configs.put(String.format(CONSUMER_FETCH_MESSAGE_MAX_BYTES_KEY, 
getSystemName()), Long.toString(consumerFetchMessageMaxBytes)));
+    consumerConfigs.forEach((key, value) -> 
configs.put(String.format(CONSUMER_CONFIGS_CONFIG_KEY, getSystemName(), key), 
value));
+    if (!producerBootstrapServers.isEmpty()) {
+      configs.put(String.format(PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY, 
getSystemName()), String.join(",", producerBootstrapServers));
+    }
+    producerConfigs.forEach((key, value) -> 
configs.put(String.format(PRODUCER_CONFIGS_CONFIG_KEY, getSystemName(), key), 
value));
+    return configs;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java
new file mode 100644
index 0000000..ac90cf0
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestKafkaInputDescriptor {
+  @Test
+  public void testISDConfigsWithOverrides() {
+    KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka");
+
+    KafkaInputDescriptor<KV<String, Integer>> isd =
+        sd.getInputDescriptor("input-stream", KVSerde.of(new StringSerde(), 
new IntegerSerde()))
+            .withPhysicalName("physical-name")
+            .withConsumerAutoOffsetReset("largest")
+            .withConsumerFetchMessageMaxBytes(1024 * 1024);
+
+    Map<String, String> generatedConfigs = isd.toConfig();;
+    assertEquals("kafka", 
generatedConfigs.get("streams.input-stream.samza.system"));
+    assertEquals("physical-name", 
generatedConfigs.get("streams.input-stream.samza.physical.name"));
+    assertEquals("largest", 
generatedConfigs.get("systems.kafka.streams.physical-name.consumer.auto.offset.reset"));
+    assertEquals("1048576", 
generatedConfigs.get("systems.kafka.streams.physical-name.consumer.fetch.message.max.bytes"));
+  }
+
+  @Test
+  public void testISDConfigsWithDefaults() {
+    KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka")
+        .withConsumerZkConnect(ImmutableList.of("localhost:123"))
+        .withProducerBootstrapServers(ImmutableList.of("localhost:567", 
"localhost:890"));
+
+    KafkaInputDescriptor<KV<String, Integer>> isd =
+        sd.getInputDescriptor("input-stream", KVSerde.of(new StringSerde(), 
new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = isd.toConfig();
+    assertEquals("kafka", 
generatedConfigs.get("streams.input-stream.samza.system"));
+    assertEquals(1, generatedConfigs.size()); // verify that there are no 
other configs
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemDescriptor.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemDescriptor.java
new file mode 100644
index 0000000..e1ddac0
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemDescriptor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestKafkaSystemDescriptor {
+  @Test
+  public void testSDConfigsWithOverrides() {
+    KafkaSystemDescriptor sd =
+        new KafkaSystemDescriptor("kafka")
+            .withConsumerZkConnect(ImmutableList.of("localhost:1234"))
+            .withProducerBootstrapServers(ImmutableList.of("localhost:567", 
"localhost:890"))
+            
.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST)
+            .withConsumerAutoOffsetReset("smallest")
+            .withConsumerFetchMessageMaxBytes(1024*1024)
+            .withSamzaFetchThreshold(10000)
+            .withSamzaFetchThresholdBytes(1024 * 1024)
+            .withConsumerConfigs(ImmutableMap.of("custom-consumer-config-key", 
"custom-consumer-config-value"))
+            .withProducerConfigs(ImmutableMap.of("custom-producer-config-key", 
"custom-producer-config-value"))
+            
.withDefaultStreamConfigs(ImmutableMap.of("custom-stream-config-key", 
"custom-stream-config-value"));
+
+    Map<String, String> generatedConfigs = sd.toConfig();
+    assertEquals("org.apache.samza.system.kafka.KafkaSystemFactory", 
generatedConfigs.get("systems.kafka.samza.factory"));
+    assertEquals("localhost:1234", 
generatedConfigs.get("systems.kafka.consumer.zookeeper.connect"));
+    assertEquals("localhost:567,localhost:890", 
generatedConfigs.get("systems.kafka.producer.bootstrap.servers"));
+    assertEquals("smallest", 
generatedConfigs.get("systems.kafka.consumer.auto.offset.reset"));
+    assertEquals("1048576", 
generatedConfigs.get("systems.kafka.consumer.fetch.message.max.bytes"));
+    assertEquals("10000", 
generatedConfigs.get("systems.kafka.samza.fetch.threshold"));
+    assertEquals("1048576", 
generatedConfigs.get("systems.kafka.samza.fetch.threshold.bytes"));
+    assertEquals("custom-consumer-config-value", 
generatedConfigs.get("systems.kafka.consumer.custom-consumer-config-key"));
+    assertEquals("custom-producer-config-value", 
generatedConfigs.get("systems.kafka.producer.custom-producer-config-key"));
+    assertEquals("custom-stream-config-value", 
generatedConfigs.get("systems.kafka.default.stream.custom-stream-config-key"));
+    assertEquals("oldest", 
generatedConfigs.get("systems.kafka.default.stream.samza.offset.default"));
+    assertEquals(11, generatedConfigs.size());
+  }
+
+  @Test
+  public void testSDConfigsWithoutOverrides() {
+    KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka");
+
+    Map<String, String> generatedConfigs = sd.toConfig();
+    assertEquals("org.apache.samza.system.kafka.KafkaSystemFactory", 
generatedConfigs.get("systems.kafka.samza.factory"));
+    assertEquals(1, generatedConfigs.size()); // verify that there are no 
other configs
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
index 16da035..07f4f55 100644
--- 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage.kv;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
@@ -31,7 +32,6 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
@@ -50,6 +50,7 @@ import com.google.common.base.Preconditions;
  * stores.
  */
 abstract public class BaseLocalStoreBackedTableProvider extends 
BaseTableProvider {
+  public static final Pattern SYSTEM_STREAM_NAME_PATTERN = 
Pattern.compile("[\\d\\w-_.]+");
 
   protected KeyValueStore kvStore;
 
@@ -101,8 +102,8 @@ abstract public class BaseLocalStoreBackedTableProvider 
extends BaseTableProvide
 
     List<String> sideInputs = tableSpec.getSideInputs();
     if (sideInputs != null && !sideInputs.isEmpty()) {
-      sideInputs.forEach(si -> 
Preconditions.checkState(StreamGraphSpec.isValidStreamId(si), String.format(
-          "Side input stream %s doesn't confirm to pattern %s", si, 
StreamGraphSpec.STREAM_ID_PATTERN)));
+      sideInputs.forEach(si -> 
Preconditions.checkState(isValidSystemStreamName(si), String.format(
+          "Side input stream %s doesn't confirm to pattern %s", si, 
SYSTEM_STREAM_NAME_PATTERN)));
       String formattedSideInputs = String.join(",", sideInputs);
       storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, 
tableSpec.getId()), formattedSideInputs);
       
storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE,
 tableSpec.getId()),
@@ -121,8 +122,8 @@ abstract public class BaseLocalStoreBackedTableProvider 
extends BaseTableProvide
             tableSpec.getId());
       }
 
-      
Preconditions.checkState(StreamGraphSpec.isValidStreamId(changelogStream), 
String.format(
-          "Changelog stream %s doesn't confirm to pattern %s", 
changelogStream, StreamGraphSpec.STREAM_ID_PATTERN));
+      Preconditions.checkState(isValidSystemStreamName(changelogStream), 
String.format(
+          "Changelog stream %s doesn't confirm to pattern %s", 
changelogStream, SYSTEM_STREAM_NAME_PATTERN));
       storeConfig.put(String.format(StorageConfig.CHANGELOG_STREAM(), 
tableSpec.getId()), changelogStream);
 
       String changelogReplicationFactor = tableSpec.getConfig().get(
@@ -140,4 +141,8 @@ abstract public class BaseLocalStoreBackedTableProvider 
extends BaseTableProvide
   public void close() {
     logger.info("Shutting down table provider for table " + tableSpec.getId());
   }
+
+  private boolean isValidSystemStreamName(String name) {
+    return StringUtils.isNotBlank(name) && 
SYSTEM_STREAM_NAME_PATTERN.matcher(name).matches();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 1db3000..c422130 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -37,8 +37,12 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
@@ -151,7 +155,11 @@ public class QueryTranslator {
 
     Optional<TableDescriptor> tableDescriptor = 
sinkConfig.getTableDescriptor();
     if (!tableDescriptor.isPresent()) {
-      
outputStream.sendTo(streamGraph.getOutputStream(sinkConfig.getStreamName()));
+      KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>());
+      String systemName = sinkConfig.getSystemName();
+      DelegatingSystemDescriptor sd = 
context.getSystemDescriptors().computeIfAbsent(systemName, 
DelegatingSystemDescriptor::new);
+      GenericOutputDescriptor<KV<Object, Object>> osd = 
sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
+      outputStream.sendTo(streamGraph.getOutputStream(osd));
     } else {
       Table outputTable = streamGraph.getTable(tableDescriptor.get());
       if (outputTable == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index fa3d9d3..46e0840 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -27,7 +27,11 @@ import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.task.TaskContext;
@@ -74,9 +78,14 @@ class ScanTranslator {
     String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
 
     Validate.isTrue(relMsgConverters.containsKey(sourceName), 
String.format("Unknown source %s", sourceName));
-    final String streamName = 
systemStreamConfig.get(sourceName).getStreamName();
+    SqlIOConfig sqlIOConfig = systemStreamConfig.get(sourceName);
+    final String systemName = sqlIOConfig.getSystemName();
+    final String streamName = sqlIOConfig.getStreamName();
 
-    MessageStream<KV<Object, Object>> inputStream = 
streamGraph.getInputStream(streamName);
+    KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>());
+    DelegatingSystemDescriptor sd = 
context.getSystemDescriptors().computeIfAbsent(systemName, 
DelegatingSystemDescriptor::new);
+    GenericInputDescriptor<KV<Object, Object>> isd = 
sd.getInputDescriptor(streamName, noOpKVSerde);
+    MessageStream<KV<Object, Object>> inputStream = 
streamGraph.getInputStream(isd);
     MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = 
inputStream.map(new ScanMapFunction(sourceName));
 
     context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index fcebeef..e622d55 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -35,6 +35,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
@@ -50,8 +51,9 @@ public class TranslatorContext implements Cloneable {
   private final StreamGraph streamGraph;
   private final RexToJavaCompiler compiler;
   private final Map<String, SamzaRelConverter> relSamzaConverters;
-  private final Map<Integer, MessageStream> messsageStreams;
+  private final Map<Integer, MessageStream> messageStreams;
   private final Map<Integer, RelNode> relNodes;
+  private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
 
   /**
    * The internal variables that are not shared among all cloned {@link 
TranslatorContext}
@@ -123,10 +125,11 @@ public class TranslatorContext implements Cloneable {
     this.streamGraph  = other.streamGraph;
     this.compiler = other.compiler;
     this.relSamzaConverters = other.relSamzaConverters;
-    this.messsageStreams = other.messsageStreams;
+    this.messageStreams = other.messageStreams;
     this.relNodes = other.relNodes;
     this.executionContext = other.executionContext.clone();
     this.dataContext = new DataContextImpl();
+    this.systemDescriptors = other.systemDescriptors;
   }
 
   /**
@@ -142,8 +145,9 @@ public class TranslatorContext implements Cloneable {
     this.executionContext = executionContext;
     this.dataContext = new DataContextImpl();
     this.relSamzaConverters = converters;
-    this.messsageStreams = new HashMap<>();
+    this.messageStreams = new HashMap<>();
     this.relNodes = new HashMap<>();
+    this.systemDescriptors = new HashMap<>();
   }
 
   /**
@@ -184,7 +188,7 @@ public class TranslatorContext implements Cloneable {
    * @param stream the stream
    */
   void registerMessageStream(int id, MessageStream stream) {
-    messsageStreams.put(id, stream);
+    messageStreams.put(id, stream);
   }
 
   /**
@@ -194,7 +198,7 @@ public class TranslatorContext implements Cloneable {
    * @return the message stream
    */
   MessageStream getMessageStream(int id) {
-    return messsageStreams.get(id);
+    return messageStreams.get(id);
   }
 
   void registerRelNode(int id, RelNode relNode) {
@@ -209,6 +213,10 @@ public class TranslatorContext implements Cloneable {
     return this.relSamzaConverters.get(source);
   }
 
+  Map<String, DelegatingSystemDescriptor> getSystemDescriptors() {
+    return this.systemDescriptors;
+  }
+
   /**
    * This method helps to create a per task instance of translator context
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 2de4856..7395a3d 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -147,7 +147,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
     when(mockOutputStream.isKeyed()).thenReturn(true);
     IntermediateMessageStreamImpl
         mockPartitionedStream = new IntermediateMessageStreamImpl(mockGraph, 
mockInputOp, mockOutputStream);
-    when(mockGraph.getIntermediateStream(any(String.class), 
any(Serde.class))).thenReturn(mockPartitionedStream);
+    when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class), 
eq(false))).thenReturn(mockPartitionedStream);
 
     
doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3),
 any(MessageStream.class));
     RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index ede7995..1776067 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -52,7 +52,7 @@ public class TestQueryTranslator {
     Assert.assertTrue(originContext.getStreamGraph() == 
clonedContext.getStreamGraph());
     Assert.assertTrue(originContext.getExpressionCompiler() == 
clonedContext.getExpressionCompiler());
     Assert.assertTrue(Whitebox.getInternalState(originContext, 
"relSamzaConverters") == Whitebox.getInternalState(clonedContext, 
"relSamzaConverters"));
-    Assert.assertTrue(Whitebox.getInternalState(originContext, 
"messsageStreams") == Whitebox.getInternalState(clonedContext, 
"messsageStreams"));
+    Assert.assertTrue(Whitebox.getInternalState(originContext, 
"messageStreams") == Whitebox.getInternalState(clonedContext, 
"messageStreams"));
     Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == 
Whitebox.getInternalState(clonedContext, "relNodes"));
     Assert.assertNotEquals(originContext.getDataContext(), 
clonedContext.getDataContext());
     validateClonedExecutionContext(originContext.getExecutionContext(), 
clonedContext.getExecutionContext());
@@ -137,8 +137,7 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
 
b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
index c029eb4..d7f805d 100644
--- 
a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
+++ 
b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
@@ -31,6 +31,9 @@ import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 
@@ -51,13 +54,21 @@ public class AppWithGlobalConfigExample implements 
StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    graph.getInputStream("myPageViewEevent", KVSerde.of(new 
StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)))
-        .map(KV::getValue)
+    KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
+
+    KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
+        trackingSystem.getInputDescriptor("pageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class));
+
+    KafkaOutputDescriptor<KV<String, PageViewCount>> outputStreamDescriptor =
+        trackingSystem.getOutputDescriptor("pageViewEventPerMember",
+            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewCount.class)));
+
+    graph.getInputStream(inputStreamDescriptor)
         .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m 
-> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, null, null)
             .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
             .setAccumulationMode(AccumulationMode.DISCARDING), "w1")
         .map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m)))
-        .sendTo(graph.getOutputStream("pageViewEventPerMemberStream", 
KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewCount.class))));
+        .sendTo(graph.getOutputStream(outputStreamDescriptor));
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java 
b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
index 9ca4f35..1c1b4be 100644
--- a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
@@ -22,12 +22,15 @@ package org.apache.samza.example;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.operators.MessageStream;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 
@@ -50,12 +53,21 @@ public class BroadcastExample implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    KVSerde<String, PageViewEvent> pgeMsgSerde = KVSerde.of(new 
StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
-    MessageStream<KV<String, PageViewEvent>> inputStream = 
graph.getInputStream("pageViewEventStream", pgeMsgSerde);
+    KVSerde<String, PageViewEvent> serde = KVSerde.of(new 
StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
+    KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
+    KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent =
+        trackingSystem.getInputDescriptor("pageViewEvent", serde);
+    KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream1 =
+        trackingSystem.getOutputDescriptor("outStream1", serde);
+    KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream2 =
+        trackingSystem.getOutputDescriptor("outStream2", serde);
+    KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 =
+        trackingSystem.getOutputDescriptor("outStream3", serde);
 
-    inputStream.filter(m -> 
m.key.equals("key1")).sendTo(graph.getOutputStream("outStream1", pgeMsgSerde));
-    inputStream.filter(m -> 
m.key.equals("key2")).sendTo(graph.getOutputStream("outStream2", pgeMsgSerde));
-    inputStream.filter(m -> 
m.key.equals("key3")).sendTo(graph.getOutputStream("outStream3", pgeMsgSerde));
+    MessageStream<KV<String, PageViewEvent>> inputStream = 
graph.getInputStream(pageViewEvent);
+    inputStream.filter(m -> 
m.key.equals("key1")).sendTo(graph.getOutputStream(outStream1));
+    inputStream.filter(m -> 
m.key.equals("key2")).sendTo(graph.getOutputStream(outStream2));
+    inputStream.filter(m -> 
m.key.equals("key3")).sendTo(graph.getOutputStream(outStream3));
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java 
b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
index 9edaabe..4d3307b 100644
--- 
a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
+++ 
b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -19,6 +19,10 @@
 package org.apache.samza.example;
 
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
@@ -31,14 +35,12 @@ import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 
 /**
  * Example code using {@link KeyValueStore} to implement event-time window
@@ -58,12 +60,19 @@ public class KeyValueStoreExample implements 
StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    MessageStream<PageViewEvent> pageViewEvents =
-        graph.getInputStream("pageViewEventStream", new 
JsonSerdeV2<>(PageViewEvent.class));
-    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember =
-        graph.getOutputStream("pageViewEventPerMember",
+    KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
+
+    KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
+        trackingSystem.getInputDescriptor("pageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class));
+
+    KafkaOutputDescriptor<KV<String, StatsOutput>> outputStreamDescriptor =
+        trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(StatsOutput.class)));
 
+    graph.setDefaultSystem(trackingSystem);
+    MessageStream<PageViewEvent> pageViewEvents = 
graph.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = 
graph.getOutputStream(outputStreamDescriptor);
+
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java 
b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
index ff983a4..33d60d6 100644
--- a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
@@ -20,14 +20,19 @@
 package org.apache.samza.example;
 
 import com.google.common.collect.ImmutableList;
+
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.operators.MessageStream;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 public class MergeExample implements StreamApplication {
@@ -45,14 +50,22 @@ public class MergeExample implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    KVSerde<String, PageViewEvent> serde = KVSerde.of(new 
StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
+    KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
 
-    KVSerde<String, PageViewEvent>
-        pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new 
JsonSerdeV2<>(PageViewEvent.class));
+    KafkaInputDescriptor<KV<String, PageViewEvent>> isd1 =
+        trackingSystem.getInputDescriptor("pageViewStream1", serde);
+    KafkaInputDescriptor<KV<String, PageViewEvent>> isd2 =
+        trackingSystem.getInputDescriptor("pageViewStream2", serde);
+    KafkaInputDescriptor<KV<String, PageViewEvent>> isd3 =
+        trackingSystem.getInputDescriptor("pageViewStream3", serde);
 
-    
MessageStream.mergeAll(ImmutableList.of(graph.getInputStream("viewStream1", 
pgeMsgSerde),
-        graph.getInputStream("viewStream2", pgeMsgSerde), 
graph.getInputStream("viewStream3", pgeMsgSerde)))
-        .sendTo(graph.getOutputStream("mergedStream", pgeMsgSerde));
+    KafkaOutputDescriptor<KV<String, PageViewEvent>> osd =
+        trackingSystem.getOutputDescriptor("mergedStream", serde);
 
+    MessageStream
+        .mergeAll(ImmutableList.of(graph.getInputStream(isd1), 
graph.getInputStream(isd2), graph.getInputStream(isd3)))
+        .sendTo(graph.getOutputStream(osd));
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
 
b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 1c0bc25..34b5fc6 100644
--- 
a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ 
b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -18,21 +18,21 @@
  */
 package org.apache.samza.example;
 
+import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
-import java.time.Duration;
-
 
 /**
  * Simple 2-way stream-to-stream join example
@@ -52,21 +52,22 @@ public class OrderShipmentJoinExample implements 
StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-
-    MessageStream<OrderRecord> orders =
-        graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class));
-    MessageStream<ShipmentRecord> shipments =
-        graph.getInputStream("shipments", new 
JsonSerdeV2<>(ShipmentRecord.class));
-    OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders =
-        graph.getOutputStream("fulfilledOrders",
+    KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
+
+    KafkaInputDescriptor<OrderRecord> orderStreamDescriptor =
+        trackingSystem.getInputDescriptor("orders", new 
JsonSerdeV2<>(OrderRecord.class));
+    KafkaInputDescriptor<ShipmentRecord> shipmentStreamDescriptor =
+        trackingSystem.getInputDescriptor("shipments", new 
JsonSerdeV2<>(ShipmentRecord.class));
+    KafkaOutputDescriptor<KV<String, FulfilledOrderRecord>> 
fulfilledOrdersStreamDescriptor =
+        trackingSystem.getOutputDescriptor("fulfilledOrders",
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(FulfilledOrderRecord.class)));
 
-    orders
-        .join(shipments, new MyJoinFunction(),
+    graph.getInputStream(orderStreamDescriptor)
+        .join(graph.getInputStream(shipmentStreamDescriptor), new 
MyJoinFunction(),
             new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new 
JsonSerdeV2<>(ShipmentRecord.class),
             Duration.ofMinutes(1), "join")
         .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
-        .sendTo(fulfilledOrders);
+        .sendTo(graph.getOutputStream(fulfilledOrdersStreamDescriptor));
 
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java 
b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
index 2581506..dc5eb74 100644
--- 
a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
+++ 
b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -35,6 +35,9 @@ import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 
@@ -56,13 +59,18 @@ public class PageViewCounterExample implements 
StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
 
-    MessageStream<PageViewEvent> pageViewEvents = null;
-    pageViewEvents = graph.getInputStream("pageViewEventStream", new 
JsonSerdeV2<>(PageViewEvent.class));
-    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
-        graph.getOutputStream("pageViewEventPerMemberStream",
+    KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
+        trackingSystem.getInputDescriptor("pageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class));
+
+    KafkaOutputDescriptor<KV<String, PageViewCount>> outputStreamDescriptor =
+        trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewCount.class)));
 
+    MessageStream<PageViewEvent> pageViewEvents = 
graph.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = 
graph.getOutputStream(outputStreamDescriptor);
+
     SupplierFunction<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
     pageViewEvents
@@ -71,7 +79,6 @@ public class PageViewCounterExample implements 
StreamApplication {
             .setAccumulationMode(AccumulationMode.DISCARDING), 
"tumblingWindow")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), new 
PageViewCount(windowPane)))
         .sendTo(pageViewEventPerMemberStream);
-
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java 
b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
index 7f28346..b776c7d 100644
--- a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.example;
 
+import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
@@ -30,10 +31,11 @@ import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
-import java.time.Duration;
-
 
 /**
  * Example {@link StreamApplication} code to test the API methods with 
re-partition operator
@@ -53,18 +55,24 @@ public class RepartitionExample implements 
StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
 
-    MessageStream<PageViewEvent> pageViewEvents =
-        graph.getInputStream("pageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class));
-    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember =
-        graph.getOutputStream("pageViewEventPerMember",
+    KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
+        trackingSystem.getInputDescriptor("pageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class));
+
+    KafkaOutputDescriptor<KV<String, MyStreamOutput>> outputStreamDescriptor =
+        trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(MyStreamOutput.class)));
 
+    graph.setDefaultSystem(trackingSystem);
+    MessageStream<PageViewEvent> pageViewEvents = 
graph.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = 
graph.getOutputStream(outputStreamDescriptor);
+
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
-        .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), 
() -> 0, (m, c) -> c + 1, null, null),
-            "window")
+        .window(Windows.keyedTumblingWindow(
+            KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, 
null), "window")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), new 
MyStreamOutput(windowPane)))
         .sendTo(pageViewEventPerMember);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java 
b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
index 4950695..cbc1e8e 100644
--- a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.example;
 
+import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
@@ -26,16 +27,17 @@ import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.SupplierFunction;
-import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
-import java.time.Duration;
-
 
 /**
  * Example implementation of a simple user-defined task w/ a window operator.
@@ -56,11 +58,18 @@ public class WindowExample implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    KafkaSystemDescriptor trackingSystem = new 
KafkaSystemDescriptor("tracking");
+
+    KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
+        trackingSystem.getInputDescriptor("pageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class));
+    KafkaOutputDescriptor<Integer> outputStreamDescriptor =
+        trackingSystem.getOutputDescriptor("pageViewEventPerMember", new 
IntegerSerde());
+
+    MessageStream<PageViewEvent> inputStream = 
graph.getInputStream(inputStreamDescriptor);
+    OutputStream<Integer> outputStream = 
graph.getOutputStream(outputStreamDescriptor);
 
     SupplierFunction<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 
: c + 1;
-    MessageStream<PageViewEvent> inputStream = 
graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>());
-    OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", 
new IntegerSerde());
 
     // create a tumbling window that outputs the number of message collected 
every 10 minutes.
     // also emit early results if either the number of messages collected 
reaches 30000, or if no new messages arrive

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 3c45967..5ca497a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -138,7 +138,7 @@ public class TestRunner {
    */
   private void registerSystem(String systemName) {
     if (!systems.containsKey(systemName)) {
-      systems.put(systemName, CollectionStreamSystemSpec.create(systemName));
+      systems.put(systemName, CollectionStreamSystemSpec.create(systemName, 
JOB_NAME));
       configs.putAll(systems.get(systemName).getSystemConfigs());
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
 
b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
index c005c41..5658f61 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
@@ -22,6 +22,7 @@ package org.apache.samza.test.framework.system;
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
 
 
@@ -32,11 +33,17 @@ import 
org.apache.samza.system.inmemory.InMemorySystemFactory;
  * Following system level configs are set by default
  * <ol>
  *   <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+ *   <li>"jobs.job-name.systems.%s.default.stream.samza.offset.default" = 
"oldest"</li>
  *   <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+ *   <li>"jobs.job-name.systems.%s.samza.factory" = {@link 
InMemorySystemFactory}</li>
  * </ol>
- *
+ * The "systems.*" configs are required since the planner uses the system to 
get metadata about streams during
+ * planning. The "jobs.job-name.systems.*" configs are required since configs 
generated from user provided
+ * system/stream descriptors override configs originally supplied to the 
planner. Configs in the "jobs.job-name.*"
+ * scope have the highest precedence.
  */
 public class CollectionStreamSystemSpec {
+  private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s."; // prefix 
to override configs generated by the planner
   private static final String SYSTEM_FACTORY = "systems.%s.samza.factory";
   private static final String SYSTEM_OFFSET = 
"systems.%s.default.stream.samza.offset.default";
 
@@ -51,11 +58,13 @@ public class CollectionStreamSystemSpec {
    * <p>
    * @param systemName represents unique name of the system
    */
-  private CollectionStreamSystemSpec(String systemName) {
+  private CollectionStreamSystemSpec(String systemName, String jobName) {
     this.systemName = systemName;
     systemConfigs = new HashMap<String, String>();
     systemConfigs.put(String.format(SYSTEM_FACTORY, systemName), 
InMemorySystemFactory.class.getName());
+    systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_FACTORY, 
jobName, systemName), InMemorySystemFactory.class.getName());
     systemConfigs.put(String.format(SYSTEM_OFFSET, systemName), "oldest");
+    systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_OFFSET, 
jobName, systemName), "oldest");
   }
 
   public String getSystemName() {
@@ -67,13 +76,15 @@ public class CollectionStreamSystemSpec {
   }
 
   /**
-   * Creates a {@link CollectionStreamSystemSpec} with name {@code name}
-   * @param name represents name of the {@link CollectionStreamSystemSpec}
+   * Creates a {@link CollectionStreamSystemSpec} with name {@code systemName}
+   * @param systemName represents name of the {@link 
CollectionStreamSystemSpec}
+   * @param jobName name of the job
    * @return an instance of {@link CollectionStreamSystemSpec}
    */
-  public static CollectionStreamSystemSpec create(String name) {
-    Preconditions.checkState(name != null);
-    return new CollectionStreamSystemSpec(name);
+  public static CollectionStreamSystemSpec create(String systemName, String 
jobName) {
+    Preconditions.checkState(StringUtils.isNotBlank(systemName));
+    Preconditions.checkState(StringUtils.isNotBlank(jobName));
+    return new CollectionStreamSystemSpec(systemName, jobName);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
index f6e3d5f..af20fd7 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
@@ -20,7 +20,13 @@ package org.apache.samza.test.integration;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,14 +35,21 @@ import org.slf4j.LoggerFactory;
  * Acts as a pass through filter for all the events from a input stream.
  */
 public class TestStandaloneIntegrationApplication implements StreamApplication 
{
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class);
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    String inputStream = config.get("input.stream.name");
+    String systemName = "testSystemName";
+    String inputStreamName = config.get("input.stream.name");
     String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
-    LOGGER.info("Publishing message to: {}.", outputStreamName);
-    
graph.getInputStream(inputStream).sendTo(graph.getOutputStream(outputStreamName));
+    LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, 
outputStreamName);
+    KafkaSystemDescriptor kafkaSystemDescriptor = new 
KafkaSystemDescriptor(systemName);
+
+    KVSerde<Object, Object> noOpSerde = KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>());
+    KafkaInputDescriptor<KV<Object, Object>> isd =
+        kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde);
+    KafkaOutputDescriptor<KV<Object, Object>> osd =
+        kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde);
+    graph.getInputStream(isd).sendTo(graph.getOutputStream(osd));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 3301af8..b86c6af 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -31,8 +31,12 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.test.controlmessages.TestData.PageView;
@@ -40,8 +44,8 @@ import 
org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.ArraySystemFactory;
 import org.apache.samza.test.util.Base64Serializer;
-
 import org.junit.Test;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -92,7 +96,10 @@ public class EndOfStreamIntegrationTest extends 
AbstractIntegrationTestHarness {
 
     final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
     final StreamApplication app = (streamGraph, cfg) -> {
-      streamGraph.<KV<String, PageView>>getInputStream("PageView")
+      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<KV<String, PageView>> isd =
+          sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
+      streamGraph.getInputStream(isd)
         .map(Values.create())
         .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
         .sink((m, collector, coordinator) -> {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index d4dc4ed..5336595 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.test.controlmessages;
 
+import scala.collection.JavaConverters;
+
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,6 +40,8 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
@@ -48,6 +52,8 @@ import org.apache.samza.processor.TestStreamProcessorUtil;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.TestLocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerdeFactory;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerdeFactory;
 import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
@@ -66,7 +72,6 @@ import 
org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.SimpleSystemAdmin;
 import org.apache.samza.test.util.TestStreamConsumer;
 import org.junit.Test;
-import scala.collection.JavaConverters;
 
 import static org.junit.Assert.assertEquals;
 
@@ -143,7 +148,10 @@ public class WatermarkIntegrationTest extends 
AbstractIntegrationTestHarness {
 
     List<PageView> received = new ArrayList<>();
     final StreamApplication app = (streamGraph, cfg) -> {
-      streamGraph.<KV<String, PageView>>getInputStream("PageView")
+      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<KV<String, PageView>> isd =
+          sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
+      streamGraph.getInputStream(isd)
           .map(EndOfStreamIntegrationTest.Values.create())
           .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
           .sink((m, collector, coordinator) -> {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
index ec52aa4..aca6c40 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
@@ -19,17 +19,18 @@
 
 package org.apache.samza.test.framework;
 
+import java.util.Arrays;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
 
-import java.util.Arrays;
-
 public class BroadcastAssertApp implements StreamApplication {
-
+  public static final String SYSTEM = "kafka";
   public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName";
 
 
@@ -38,8 +39,10 @@ public class BroadcastAssertApp implements StreamApplication 
{
     String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
 
     final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
+    KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, 
serde);
     final MessageStream<PageView> broadcastPageViews = graph
-        .getInputStream(inputTopic, serde)
+        .getInputStream(isd)
         .broadcast(serde, "pv");
 
     /**

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index ba4c985..6fdf887 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -27,27 +27,40 @@ import java.util.Random;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.controlmessages.TestData;
 import org.apache.samza.test.framework.stream.CollectionStream;
-import static org.apache.samza.test.controlmessages.TestData.PageView;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.samza.test.controlmessages.TestData.PageView;
+
 
 public class StreamApplicationIntegrationTest {
 
   final StreamApplication pageViewFilter = (streamGraph, cfg) -> {
-    streamGraph.<KV<String, TestData.PageView>>getInputStream("PageView").map(
-        StreamApplicationIntegrationTest.Values.create()).filter(pv -> 
pv.getPageKey().equals("inbox"));
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
+    KafkaInputDescriptor<KV<String, PageView>> isd =
+        ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
+    MessageStream<KV<String, TestData.PageView>> inputStream = 
streamGraph.getInputStream(isd);
+    
inputStream.map(StreamApplicationIntegrationTest.Values.create()).filter(pv -> 
pv.getPageKey().equals("inbox"));
   };
 
-  final StreamApplication pageViewParition = (streamGraph, cfg) -> {
-    streamGraph.<KV<String, PageView>>getInputStream("PageView")
+  final StreamApplication pageViewRepartition = (streamGraph, cfg) -> {
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
+    KafkaInputDescriptor<KV<String, PageView>> isd =
+        ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
+    MessageStream<KV<String, TestData.PageView>> inputStream = 
streamGraph.getInputStream(isd);
+    inputStream
         .map(Values.create())
-        .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
+        .partitionBy(PageView::getMemberId, pv -> pv, "p1")
         .sink((m, collector, coordinator) -> {
             collector.send(new OutgoingMessageEnvelope(new 
SystemStream("test", "Output"),
                 m.getKey(), m.getKey(),
@@ -74,7 +87,7 @@ public class StreamApplicationIntegrationTest {
     CollectionStream output = CollectionStream.empty("test", "Output", 10);
 
     TestRunner
-        .of(pageViewParition)
+        .of(pageViewRepartition)
         .addInputStream(input)
         .addOutputStream(output)
         .addOverrideConfig("job.default.system", "test")
@@ -99,7 +112,7 @@ public class StreamApplicationIntegrationTest {
     CollectionStream output = CollectionStream.empty("test", "Output", 10);
 
     TestRunner
-        .of(pageViewParition)
+        .of(pageViewRepartition)
         .addInputStream(input)
         .addOutputStream(output)
         .run(Duration.ofMillis(1000));

Reply via email to