Repository: samza
Updated Branches:
  refs/heads/master 2d6b19953 -> 2a71baf7c


http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
index 09da838..91234b7 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
@@ -19,6 +19,11 @@
 
 package org.apache.samza.test.framework;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
@@ -27,21 +32,19 @@ import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
 public class TestTimerApp implements StreamApplication {
   public static final String PAGE_VIEWS = "page-views";
 
   @Override
   public void init(StreamGraph graph, Config config) {
     final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
-    final MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, 
serde);
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
+    KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, 
serde);
+    final MessageStream<PageView> pageViews = graph.getInputStream(isd);
     final MessageStream<PageView> output = pageViews.flatMap(new 
FlatmapTimerFn());
 
     MessageStreamAssert.that("Output from timer function should container all 
complete messages", output, serde)

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index ec9c05d..8ee7e00 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -35,6 +35,8 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.operator.data.AdClick;
 import org.apache.samza.test.operator.data.PageView;
@@ -48,10 +50,10 @@ import org.apache.samza.util.CommandLine;
  * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream 
join and a windowed count.
  */
 public class RepartitionJoinWindowApp implements StreamApplication {
-
-  public static final String INPUT_TOPIC_NAME_1_PROP = "inputTopicName1";
-  public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2";
-  public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName";
+  public static final String SYSTEM = "kafka";
+  public static final String INPUT_TOPIC_1_CONFIG_KEY = "inputTopic1";
+  public static final String INPUT_TOPIC_2_CONFIG_KEY = "inputTopic2";
+  public static final String OUTPUT_TOPIC_CONFIG_KEY = "outputTopic";
 
   private final List<String> intermediateStreamIds = new ArrayList<>();
 
@@ -68,12 +70,17 @@ public class RepartitionJoinWindowApp implements 
StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    String inputTopicName1 = config.get(INPUT_TOPIC_NAME_1_PROP);
-    String inputTopicName2 = config.get(INPUT_TOPIC_NAME_2_PROP);
-    String outputTopic = config.get(OUTPUT_TOPIC_NAME_PROP);
-
-    MessageStream<PageView> pageViews = graph.getInputStream(inputTopicName1, 
new JsonSerdeV2<>(PageView.class));
-    MessageStream<AdClick> adClicks = graph.getInputStream(inputTopicName2, 
new JsonSerdeV2<>(AdClick.class));
+    // offset.default = oldest required for tests since checkpoint topic is 
empty on start and messages are published
+    // before the application is run
+    String inputTopic1 = config.get(INPUT_TOPIC_1_CONFIG_KEY);
+    String inputTopic2 = config.get(INPUT_TOPIC_2_CONFIG_KEY);
+    String outputTopic = config.get(OUTPUT_TOPIC_CONFIG_KEY);
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
+    KafkaInputDescriptor<PageView> id1 = ksd.getInputDescriptor(inputTopic1, 
new JsonSerdeV2<>(PageView.class));
+    KafkaInputDescriptor<AdClick> id2 = ksd.getInputDescriptor(inputTopic2, 
new JsonSerdeV2<>(AdClick.class));
+
+    MessageStream<PageView> pageViews = graph.getInputStream(id1);
+    MessageStream<AdClick> adClicks = graph.getInputStream(id2);
 
     MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = 
pageViews
         .partitionBy(PageView::getViewId, pv -> pv,
@@ -101,14 +108,15 @@ public class RepartitionJoinWindowApp implements 
StreamApplication {
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), 
String.valueOf(windowPane.getMessage().size())))
         .sink((message, messageCollector, taskCoordinator) -> {
             
taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
-            messageCollector.send(new OutgoingMessageEnvelope(new 
SystemStream("kafka", outputTopic), null, message.getKey(), 
message.getValue()));
+            messageCollector.send(
+                new OutgoingMessageEnvelope(
+                    new SystemStream("kafka", outputTopic), null, 
message.getKey(), message.getValue()));
           });
 
 
     intermediateStreamIds.add(((IntermediateMessageStreamImpl) 
pageViewsRepartitionedByViewId).getStreamId());
     intermediateStreamIds.add(((IntermediateMessageStreamImpl) 
adClicksRepartitionedByViewId).getStreamId());
     intermediateStreamIds.add(((IntermediateMessageStreamImpl) 
userPageAdClicksByUserId).getStreamId());
-
   }
 
   List<String> getIntermediateStreamIds() {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
index e233793..ae3669f 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
@@ -30,6 +30,9 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
 import org.apache.samza.util.CommandLine;
 import org.slf4j.Logger;
@@ -41,7 +44,7 @@ import org.slf4j.LoggerFactory;
 public class RepartitionWindowApp implements StreamApplication {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RepartitionWindowApp.class);
-
+  static final String SYSTEM = "kafka";
   static final String INPUT_TOPIC = "page-views";
   static final String OUTPUT_TOPIC = "Result";
 
@@ -57,16 +60,18 @@ public class RepartitionWindowApp implements 
StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    KVSerde<String, PageView> inputSerde = KVSerde.of(new 
StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
+    KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new 
StringSerde());
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
+    KafkaInputDescriptor<KV<String, PageView>> id = 
ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
+    KafkaOutputDescriptor<KV<String, String>> od = 
ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    KVSerde<String, PageView>
-        pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new 
JsonSerdeV2<>(PageView.class));
-
-    graph.getInputStream(INPUT_TOPIC, pgeMsgSerde)
+    graph.getInputStream(id)
         .map(KV::getValue)
-        .partitionBy(PageView::getUserId, m -> m, pgeMsgSerde, "p1")
+        .partitionBy(PageView::getUserId, m -> m, inputSerde, "p1")
         .window(Windows.keyedSessionWindow(m -> m.getKey(), 
Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new 
IntegerSerde()), "w1")
         .map(wp -> KV.of(wp.getKey().getKey().toString(), 
String.valueOf(wp.getMessage())))
-        .sendTo(graph.getOutputStream(OUTPUT_TOPIC));
+        .sendTo(graph.getOutputStream(od));
 
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 3224d24..3eb2662 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -32,19 +32,19 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
 import org.apache.samza.util.CommandLine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A {@link StreamApplication} that demonstrates a filter followed by a 
session window.
  */
 public class SessionWindowApp implements StreamApplication {
+  private static final String SYSTEM = "kafka";
   private static final String INPUT_TOPIC = "page-views";
   private static final String OUTPUT_TOPIC = "page-view-counts";
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(SessionWindowApp.class);
   private static final String FILTER_KEY = "badKey";
 
   public static void main(String[] args) {
@@ -59,10 +59,14 @@ public class SessionWindowApp implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
+    KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new 
IntegerSerde());
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
+    KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, 
inputSerde);
+    KafkaOutputDescriptor<KV<String, Integer>> od = 
ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new 
JsonSerdeV2<>(PageView.class));
-    OutputStream<KV<String, Integer>> outputStream =
-        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), 
new IntegerSerde()));
+    MessageStream<PageView> pageViews = graph.getInputStream(id);
+    OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(od);
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
@@ -70,6 +74,5 @@ public class SessionWindowApp implements StreamApplication {
             new StringSerde(), new JsonSerdeV2<>(PageView.class)), 
"sessionWindow")
         .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
-
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index 2f75103..5fee9cf 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -78,9 +78,9 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     String appName = "UserPageAdClickCounter";
     Map<String, String> configs = new HashMap<>();
     configs.put("systems.kafka.samza.delete.committed.messages", "false");
-    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_1_PROP, 
inputTopicName1);
-    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, 
inputTopicName2);
-    configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, 
outputTopicName);
+    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, 
inputTopicName1);
+    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, 
inputTopicName2);
+    configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_CONFIG_KEY, 
outputTopicName);
 
     runApplication(app, appName, configs);
 
@@ -104,9 +104,9 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     final String appName = "UserPageAdClickCounter2";
     Map<String, String> configs = new HashMap<>();
     configs.put("systems.kafka.samza.delete.committed.messages", "true");
-    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_1_PROP, 
inputTopicName1);
-    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, 
inputTopicName2);
-    configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, 
outputTopicName);
+    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, 
inputTopicName1);
+    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, 
inputTopicName2);
+    configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_CONFIG_KEY, 
outputTopicName);
 
     RunApplicationContext runApplicationContext = runApplication(app, appName, 
configs);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 058a690..6373292 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -39,8 +39,7 @@ import static 
org.apache.samza.test.operator.RepartitionWindowApp.*;
  * Test driver for {@link RepartitionWindowApp}.
  */
 public class TestRepartitionWindowApp extends 
StreamApplicationIntegrationTestHarness {
-
-  static final String APP_NAME = "PageViewCounterApp";
+  private static final String APP_NAME = "PageViewCounterApp";
 
   @Test
   public void testRepartitionedSessionWindowCounter() throws Exception {
@@ -65,8 +64,6 @@ public class TestRepartitionWindowApp extends 
StreamApplicationIntegrationTestHa
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
     configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
     configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-    configs.put(String.format("streams.%s.samza.msg.serde", INPUT_TOPIC), 
"string");
-    configs.put(String.format("streams.%s.samza.key.serde", INPUT_TOPIC), 
"string");
 
     // run the application
     runApplication(new RepartitionWindowApp(), APP_NAME, new 
MapConfig(configs));
@@ -86,6 +83,5 @@ public class TestRepartitionWindowApp extends 
StreamApplicationIntegrationTestHa
         Assert.assertEquals(value, "1");
       }
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 40a3f91..d77a2a6 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -32,20 +32,20 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
 import org.apache.samza.util.CommandLine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * A {@link StreamApplication} that demonstrates a filter followed by a 
tumbling window.
  */
 public class TumblingWindowApp implements StreamApplication {
+  private static final String SYSTEM = "kafka";
   private static final String INPUT_TOPIC = "page-views";
   private static final String OUTPUT_TOPIC = "page-view-counts";
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TumblingWindowApp.class);
   private static final String FILTER_KEY = "badKey";
 
   public static void main(String[] args) {
@@ -60,11 +60,14 @@ public class TumblingWindowApp implements StreamApplication 
{
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
+    KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new 
IntegerSerde());
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
+    KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, 
inputSerde);
+    KafkaOutputDescriptor<KV<String, Integer>> od = 
ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    MessageStream<PageView> pageViews =
-        graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
-    OutputStream<KV<String, Integer>> outputStream =
-        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), 
new IntegerSerde()));
+    MessageStream<PageView> pageViews = graph.getInputStream(id);
+    OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(od);
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
index db12351..1b778e8 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
@@ -30,9 +30,14 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 
 
 /**
@@ -40,12 +45,15 @@ import org.apache.samza.serializers.StringSerde;
  */
 public class TestStreamApplication implements StreamApplication, Serializable {
 
+  private final String systemName;
   private final String inputTopic;
   private final String outputTopic;
   private final String appName;
   private final String processorName;
 
-  private TestStreamApplication(String inputTopic, String outputTopic, String 
appName, String processorName) {
+  private TestStreamApplication(String systemName, String inputTopic, String 
outputTopic,
+      String appName, String processorName) {
+    this.systemName = systemName;
     this.inputTopic = inputTopic;
     this.outputTopic = outputTopic;
     this.appName = appName;
@@ -54,8 +62,11 @@ public class TestStreamApplication implements 
StreamApplication, Serializable {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    MessageStream<String> inputStream = graph.getInputStream(inputTopic, new 
NoOpSerde<String>());
-    OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new 
StringSerde());
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
+    KafkaInputDescriptor<String> isd = ksd.getInputDescriptor(inputTopic, new 
NoOpSerde<>());
+    KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor(outputTopic, 
new StringSerde());
+    MessageStream<String> inputStream = graph.getInputStream(isd);
+    OutputStream<String> outputStream = graph.getOutputStream(osd);
     inputStream.map(new MapFunction<String, String>() {
       transient CountDownLatch latch1;
       transient CountDownLatch latch2;
@@ -124,6 +135,7 @@ public class TestStreamApplication implements 
StreamApplication, Serializable {
   }
 
   public static StreamApplication getInstance(
+      String systemName,
       String inputTopic,
       String outputTopic,
       CountDownLatch processedMessageLatch,
@@ -134,7 +146,7 @@ public class TestStreamApplication implements 
StreamApplication, Serializable {
     String processorName = config.get(JobConfig.PROCESSOR_ID());
     registerLatches(processedMessageLatch, kafkaEventsConsumedLatch, callback, 
appName, processorName);
 
-    StreamApplication app = new TestStreamApplication(inputTopic, outputTopic, 
appName, processorName);
+    StreamApplication app = new TestStreamApplication(systemName, inputTopic, 
outputTopic, appName, processorName);
     return app;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index bfa78a0..e34ee4a 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -244,7 +244,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS);
     Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig);
     LocalApplicationRunner localApplicationRunner2 = new 
LocalApplicationRunner(testAppConfig2);
-    StreamApplication streamApp2 = 
TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic,
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic,
         processedMessagesLatch, null, null, testAppConfig2);
 
     // Callback handler for streamApp1.
@@ -266,7 +267,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     // Set up stream app 1.
     Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig);
     LocalApplicationRunner localApplicationRunner1 = new 
LocalApplicationRunner(testAppConfig1);
-    StreamApplication streamApp1 = 
TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic,
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic,
         null, streamApplicationCallback, kafkaEventsConsumedLatch, 
testAppConfig1);
     localApplicationRunner1.run(streamApp1);
 
@@ -325,7 +327,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS * 2);
     Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig);
     LocalApplicationRunner localApplicationRunner2 = new 
LocalApplicationRunner(testAppConfig2);
-    StreamApplication streamApp2 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch, null, null, testAppConfig2);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch, null, null, testAppConfig2);
 
     // Callback handler for streamApp1.
     StreamApplicationCallback streamApplicationCallback = message -> {
@@ -348,7 +351,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     // Set up stream app 1.
     Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig);
     LocalApplicationRunner localApplicationRunner1 = new 
LocalApplicationRunner(testAppConfig1);
-    StreamApplication streamApp1 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null,
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null,
         streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1);
     localApplicationRunner1.run(streamApp1);
 
@@ -401,9 +405,15 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
-    StreamApplication streamApp2 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
-    StreamApplication streamApp3 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        processedMessagesLatch2, null, kafkaEventsConsumedLatch, 
applicationConfig2);
+    StreamApplication streamApp3 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        processedMessagesLatch3, null, kafkaEventsConsumedLatch, 
applicationConfig3);
 
     // Create LocalApplicationRunners
     LocalApplicationRunner applicationRunner1 = new 
LocalApplicationRunner(applicationConfig1);
@@ -470,8 +480,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
-    StreamApplication streamApp2 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        processedMessagesLatch2, null, kafkaEventsConsumedLatch, 
applicationConfig2);
 
     // Create LocalApplicationRunners
     LocalApplicationRunner applicationRunner1 = new 
LocalApplicationRunner(applicationConfig1);
@@ -489,7 +503,9 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     LocalApplicationRunner applicationRunner3 = new 
LocalApplicationRunner(appConfig);
 
     // Create a stream app with same processor id as SP2 and run it. It should 
fail.
-    StreamApplication streamApp3 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, 
null, kafkaEventsConsumedLatch, applicationConfig2);
+    StreamApplication streamApp3 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        null, null, kafkaEventsConsumedLatch, applicationConfig2);
     // Fail when the duplicate processor joins.
     expectedException.expect(SamzaException.class);
     try {
@@ -508,7 +524,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     // Set up kafka topics.
     publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
-    Map<String, String> configMap = 
buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, 
testStreamAppName, testStreamAppId);
+    Map<String, String> configMap = buildStreamApplicationConfigMap(
+        TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
 
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
     Config applicationConfig1 = new MapConfig(configMap);
@@ -524,8 +541,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
-    StreamApplication streamApp2 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        processedMessagesLatch2, null, kafkaEventsConsumedLatch, 
applicationConfig2);
 
     // Run stream application.
     applicationRunner1.run(streamApp1);
@@ -546,7 +567,9 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     LocalApplicationRunner applicationRunner4 = new 
LocalApplicationRunner(applicationConfig1);
     processedMessagesLatch1 = new CountDownLatch(1);
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * 
NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
-    streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, 
outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
+    streamApp1 = TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
+        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
     applicationRunner4.run(streamApp1);
 
     processedMessagesLatch1.await();
@@ -587,8 +610,10 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
-    StreamApplication streamApp2 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+    StreamApplication streamApp1 = 
TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, 
outputKafkaTopic,
+        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
+    StreamApplication streamApp2 = 
TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, 
outputKafkaTopic,
+        processedMessagesLatch2, null, kafkaEventsConsumedLatch, 
applicationConfig2);
 
     applicationRunner1.run(streamApp1);
     applicationRunner2.run(streamApp2);
@@ -606,7 +631,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     LocalApplicationRunner applicationRunner3 = new 
LocalApplicationRunner(applicationConfig3);
     CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
 
-    StreamApplication streamApp3 = 
TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3);
+    StreamApplication streamApp3 = 
TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, 
outputKafkaTopic,
+        processedMessagesLatch3, null, kafkaEventsConsumedLatch, 
applicationConfig3);
     applicationRunner3.run(streamApp3);
 
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * 
NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index e5775f0..5b29fe7 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -39,6 +39,8 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
@@ -97,8 +99,9 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
 
       Table<KV<Integer, Profile>> table = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
           .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-
-      streamGraph.getInputStream("Profile", new NoOpSerde<Profile>())
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", 
new NoOpSerde<>());
+      streamGraph.getInputStream(isd)
           .map(mapFn)
           .sendTo(table);
     };
@@ -132,12 +135,14 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
 
         Table<KV<Integer, Profile>> table = streamGraph.getTable(
             new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new 
IntegerSerde(), new ProfileJsonSerde())));
-
-        streamGraph.getInputStream("Profile", new NoOpSerde<Profile>())
+        DelegatingSystemDescriptor ksd = new 
DelegatingSystemDescriptor("test");
+        GenericInputDescriptor<Profile> profileISD = 
ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+        streamGraph.getInputStream(profileISD)
             .map(m -> new KV(m.getMemberId(), m))
             .sendTo(table);
 
-        streamGraph.getInputStream("PageView", new NoOpSerde<PageView>())
+        GenericInputDescriptor<PageView> pageViewISD = 
ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+        streamGraph.getInputStream(pageViewISD)
             .map(pv -> {
                 received.add(pv);
                 return pv;
@@ -207,8 +212,11 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
         Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
             .withSerde(profileKVSerde));
 
-        MessageStream<Profile> profileStream1 = 
streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>());
-        MessageStream<Profile> profileStream2 = 
streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>());
+        DelegatingSystemDescriptor ksd = new 
DelegatingSystemDescriptor("test");
+        GenericInputDescriptor<Profile> profileISD1 = 
ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
+        GenericInputDescriptor<Profile> profileISD2 = 
ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+        MessageStream<Profile> profileStream1 = 
streamGraph.getInputStream(profileISD1);
+        MessageStream<Profile> profileStream2 = 
streamGraph.getInputStream(profileISD2);
 
         profileStream1
             .map(m -> {
@@ -223,8 +231,10 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
               })
             .sendTo(profileTable);
 
-        MessageStream<PageView> pageViewStream1 = 
streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>());
-        MessageStream<PageView> pageViewStream2 = 
streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>());
+        GenericInputDescriptor<PageView> pageViewISD1 = 
ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
+        GenericInputDescriptor<PageView> pageViewISD2 = 
ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+        MessageStream<PageView> pageViewStream1 = 
streamGraph.getInputStream(pageViewISD1);
+        MessageStream<PageView> pageViewStream2 = 
streamGraph.getInputStream(pageViewISD2);
 
         pageViewStream1
             .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 1e45f5e..cc969be 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -20,6 +20,7 @@
 package org.apache.samza.test.table;
 
 import com.google.common.collect.ImmutableList;
+
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -40,6 +41,7 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.RocksDbTableDescriptor;
 import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.table.Table;
 import org.apache.samza.test.framework.TestRunner;
 import org.apache.samza.test.framework.stream.CollectionStream;
@@ -47,10 +49,10 @@ import 
org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.junit.Test;
 
 import static org.apache.samza.test.table.TestTableData.*;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+
 public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness {
   private static final String PAGEVIEW_STREAM = "pageview";
   private static final String PROFILE_STREAM = "profile";
@@ -126,11 +128,12 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
     @Override
     public void init(StreamGraph graph, Config config) {
       Table<KV<Integer, TestTableData.Profile>> table = 
graph.getTable(getTableDescriptor());
-
-      graph.getInputStream(PAGEVIEW_STREAM, new 
NoOpSerde<TestTableData.PageView>())
+      KafkaSystemDescriptor sd =
+          new 
KafkaSystemDescriptor(config.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(),
 PAGEVIEW_STREAM)));
+      graph.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new 
NoOpSerde<TestTableData.PageView>()))
           .partitionBy(TestTableData.PageView::getMemberId, v -> v, 
"partition-page-view")
           .join(table, new PageViewToProfileJoinFunction())
-          .sendTo(graph.getOutputStream(ENRICHED_PAGEVIEW_STREAM, new 
NoOpSerde<>()));
+          
.sendTo(graph.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, 
new NoOpSerde<>())));
     }
 
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index eb9fbe9..e6e73bb 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.test.table;
 
+import com.google.common.cache.CacheBuilder;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.time.Duration;
@@ -32,7 +34,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.MapConfig;
@@ -42,27 +43,27 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.caching.CachingTableDescriptor;
 import org.apache.samza.table.caching.guava.GuavaCacheTableDescriptor;
+import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteReadableTable;
+import org.apache.samza.table.remote.RemoteTableDescriptor;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.remote.RemoteReadableTable;
-import org.apache.samza.table.remote.RemoteTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.Base64Serializer;
 import org.apache.samza.util.RateLimiter;
-import com.google.common.cache.CacheBuilder;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.samza.test.table.TestTableData.*;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -185,7 +186,9 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
         inputTable = getCachingTable(inputTable, defaultCache, "input", 
streamGraph);
       }
 
-      streamGraph.getInputStream("PageView", new NoOpSerde<PageView>())
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<TestTableData.PageView> isd = 
ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      streamGraph.getInputStream(isd)
           .map(pv -> new KV<>(pv.getMemberId(), pv))
           .join(inputTable, new PageViewToProfileJoinFunction())
           .map(m -> new KV(m.getMemberId(), m))

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index 5456db6..dc11c3f 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -20,6 +20,8 @@
 package org.apache.samza.tools.benchmark;
 
 import com.google.common.base.Joiner;
+import scala.Option;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
@@ -32,10 +34,14 @@ import org.apache.commons.cli.ParseException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 
 
@@ -69,7 +75,10 @@ public class SystemConsumerWithSamzaBench extends 
AbstractSamzaBench {
     super.start();
     MessageConsumer consumeFn = new MessageConsumer();
     StreamApplication app = (graph, config) -> {
-      MessageStream<Object> stream = graph.getInputStream(streamId);
+      String systemFactoryName = new 
SystemConfig(config).getSystemFactory(systemName).get();
+      GenericSystemDescriptor sd = new GenericSystemDescriptor(systemName, 
systemFactoryName);
+      GenericInputDescriptor<Object> isd = sd.getInputDescriptor(streamId, new 
NoOpSerde<>());
+      MessageStream<Object> stream = graph.getInputStream(isd);
       stream.map(consumeFn);
     };
 

Reply via email to