http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index cefe128..864c3fc 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -19,14 +19,22 @@
 
 package org.apache.samza.execution;
 
+import java.time.Duration;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
@@ -37,12 +45,6 @@ import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
-import java.time.Duration;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
@@ -65,13 +67,18 @@ public class TestJobNode {
     when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-    graphSpec.setDefaultSerde(KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
-    MessageStream<KV<String, Object>> input1 = 
graphSpec.getInputStream("input1");
-    MessageStream<KV<String, Object>> input2 = 
graphSpec.getInputStream("input2");
-    OutputStream<KV<String, Object>> output = 
graphSpec.getOutputStream("output");
+    KVSerde<String, Object> serde = KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>());
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", 
"mockSystemFactoryClass");
+    GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = 
sd.getInputDescriptor("input1", serde);
+    GenericInputDescriptor<KV<String, Object>> inputDescriptor2 = 
sd.getInputDescriptor("input2", serde);
+    GenericOutputDescriptor<KV<String, Object>> outputDescriptor = 
sd.getOutputDescriptor("output", serde);
+    MessageStream<KV<String, Object>> input1 = 
graphSpec.getInputStream(inputDescriptor1);
+    MessageStream<KV<String, Object>> input2 = 
graphSpec.getInputStream(inputDescriptor2);
+    OutputStream<KV<String, Object>> output = 
graphSpec.getOutputStream(outputDescriptor);
     JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = 
mock(JoinFunction.class);
     input1
-        .partitionBy(KV::getKey, KV::getValue, "p1").map(kv -> kv.value)
+        .partitionBy(KV::getKey, KV::getValue, serde, "p1")
+        .map(kv -> kv.value)
         .join(input2.map(kv -> kv.value), mockJoinFn,
             new StringSerde(), new JsonSerdeV2<>(Object.class), new 
JsonSerdeV2<>(Object.class),
             Duration.ofHours(1), "j1")
@@ -133,7 +140,7 @@ public class TestJobNode {
         outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
     assertTrue("Serialized serdes should contain output msg serde",
         deserializedSerdes.containsKey(outputMsgSerde));
-    assertTrue("Serialized output msg serde should be a StringSerde",
+    assertTrue("Serialized output msg serde should be a JsonSerdeV2",
         outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
 
     String partitionByKeySerde = 
mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde");
@@ -145,7 +152,7 @@ public class TestJobNode {
     assertTrue("Serialized serdes should contain intermediate stream msg 
serde",
         deserializedSerdes.containsKey(partitionByMsgSerde));
     assertTrue(
-        "Serialized intermediate stream msg serde should be a StringSerde",
+        "Serialized intermediate stream msg serde should be a JsonSerdeV2",
         partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
 
     String leftJoinStoreKeySerde = 
mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde");
@@ -171,4 +178,50 @@ public class TestJobNode {
         
rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
   }
 
+  @Test
+  public void testAddSerdeConfigsForRepartitionWithNoDefaultSystem() {
+    StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+    StreamSpec partitionBySpec =
+        new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", 
"intermediate-system");
+
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
+
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", 
"mockSystemFactoryClassName");
+    GenericInputDescriptor<KV<String, Object>> inputDescriptor1 =
+        sd.getInputDescriptor("input", KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
+    MessageStream<KV<String, Object>> input = 
graphSpec.getInputStream(inputDescriptor1);
+    input.partitionBy(KV::getKey, KV::getValue, "p1");
+
+    JobNode jobNode = new JobNode("jobName", "jobId", 
graphSpec.getOperatorSpecGraph(), mockConfig);
+    Config config = new MapConfig();
+    StreamEdge input1Edge = new StreamEdge(inputSpec, false, false, config);
+    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, false, 
config);
+    jobNode.addInEdge(input1Edge);
+    jobNode.addInEdge(repartitionEdge);
+    jobNode.addOutEdge(repartitionEdge);
+
+    Map<String, String> configs = new HashMap<>();
+    jobNode.addSerdeConfigs(configs);
+
+    MapConfig mapConfig = new MapConfig(configs);
+    Config serializers = mapConfig.subset("serializers.registry.", true);
+
+    // make sure that the serializers deserialize correctly
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+    Map<String, Serde> deserializedSerdes = 
serializers.entrySet().stream().collect(Collectors.toMap(
+        e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), 
""),
+        e -> 
serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
+    ));
+    assertEquals(2, serializers.size()); // 2 input stream
+
+    String partitionByKeySerde = 
mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde");
+    String partitionByMsgSerde = 
mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde");
+    assertTrue("Serialized serdes should not contain intermediate stream key 
serde",
+        !deserializedSerdes.containsKey(partitionByKeySerde));
+    assertTrue("Serialized serdes should not contain intermediate stream msg 
serde",
+        !deserializedSerdes.containsKey(partitionByMsgSerde));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 7054727..0759aba 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -19,12 +19,15 @@
 package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableSet;
+
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.impl.store.TestInMemoryStore;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
@@ -38,10 +41,10 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.StreamTestUtils;
 import org.apache.samza.testUtils.TestClock;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -67,17 +70,6 @@ public class TestJoinOperator {
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
   private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 
9, 10);
 
-  private Config config;
-
-  @Before
-  public void setUp() {
-    Map<String, String> mapConfig = new HashMap<>();
-    mapConfig.put("job.default.system", "insystem");
-    mapConfig.put("job.name", "jobName");
-    mapConfig.put("job.id", "jobId");
-    config = new MapConfig(mapConfig);
-  }
-
   @Test
   public void join() throws Exception {
     StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
@@ -96,12 +88,19 @@ public class TestJoinOperator {
 
   @Test(expected = SamzaException.class)
   public void joinWithSelfThrowsException() throws Exception {
-    config.put("streams.instream.system", "insystem");
-
+    Map<String, String> mapConfig = new HashMap<>();
+    mapConfig.put("job.name", "jobName");
+    mapConfig.put("job.id", "jobId");
+    StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", 
"instream");
+    Config config = new MapConfig(mapConfig);
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+
     IntegerSerde integerSerde = new IntegerSerde();
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
-    MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream("instream", kvSerde);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", 
"mockFactoryClassName");
+    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("inStream", kvSerde);
+
+    MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream(inputDescriptor);
 
     inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, 
kvSerde, JOIN_TTL, "join");
 
@@ -297,7 +296,12 @@ public class TestJoinOperator {
   }
 
   private StreamOperatorTask createStreamOperatorTask(Clock clock, 
StreamGraphSpec graphSpec) throws Exception {
-
+    Map<String, String> mapConfig = new HashMap<>();
+    mapConfig.put("job.name", "jobName");
+    mapConfig.put("job.id", "jobId");
+    StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", 
"instream");
+    StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", 
"instream2");
+    Config config = new MapConfig(mapConfig);
     TaskContextImpl taskContext = mock(TaskContextImpl.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("insystem", "instream", new 
Partition(0)),
@@ -317,11 +321,21 @@ public class TestJoinOperator {
   }
 
   private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) 
throws IOException {
+    Map<String, String> mapConfig = new HashMap<>();
+    mapConfig.put("job.name", "jobName");
+    mapConfig.put("job.id", "jobId");
+    StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", 
"instream");
+    StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", 
"instream2");
+    Config config = new MapConfig(mapConfig);
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     IntegerSerde integerSerde = new IntegerSerde();
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
-    MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream("instream", kvSerde);
-    MessageStream<KV<Integer, Integer>> inStream2 = 
graphSpec.getInputStream("instream2", kvSerde);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", 
"mockFactoryClassName");
+    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor1 = 
sd.getInputDescriptor("inStream", kvSerde);
+    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor2 = 
sd.getInputDescriptor("inStream2", kvSerde);
+
+    MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream(inputDescriptor1);
+    MessageStream<KV<Integer, Integer>> inStream2 = 
graphSpec.getInputStream(inputDescriptor2);
 
     inStream
         .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, 
"j1")

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index fff85e8..001ffda 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -207,7 +207,7 @@ public class TestMessageStreamImpl {
     OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
     KVSerde mockKVSerde = mock(KVSerde.class);
     IntermediateMessageStreamImpl mockIntermediateStream = 
mock(IntermediateMessageStreamImpl.class);
-    when(mockGraph.getIntermediateStream(eq(mockOpName), eq(mockKVSerde)))
+    when(mockGraph.getIntermediateStream(eq(mockOpName), eq(mockKVSerde), 
eq(false)))
         .thenReturn(mockIntermediateStream);
     when(mockIntermediateStream.getOutputStream())
         .thenReturn(mockOutputStreamImpl);
@@ -237,7 +237,7 @@ public class TestMessageStreamImpl {
     when(mockGraph.getNextOpId(anyObject(), 
anyObject())).thenReturn(mockOpName);
     OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
     IntermediateMessageStreamImpl mockIntermediateStream = 
mock(IntermediateMessageStreamImpl.class);
-    when(mockGraph.getIntermediateStream(eq(mockOpName), eq(null)))
+    when(mockGraph.getIntermediateStream(eq(mockOpName), eq(null), eq(false)))
         .thenReturn(mockIntermediateStream);
     when(mockIntermediateStream.getOutputStream())
         .thenReturn(mockOutputStreamImpl);

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
index 4cfc66a..6469326 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -73,14 +73,14 @@ public class TestOperatorSpecGraph {
      */
     String inputStreamId1 = "test-input-1";
     String outputStreamId = "test-output-1";
-    InputOperatorSpec testInput = new InputOperatorSpec(inputStreamId1, new 
NoOpSerde(), new NoOpSerde(), true, inputStreamId1);
+    InputOperatorSpec testInput = new InputOperatorSpec(inputStreamId1, new 
NoOpSerde(), new NoOpSerde(), null, true, inputStreamId1);
     StreamOperatorSpec filterOp = OperatorSpecs.createFilterOperatorSpec(m -> 
true, "test-filter-2");
     OutputStreamImpl outputStream1 = new OutputStreamImpl(outputStreamId, 
null, null, true);
     OutputOperatorSpec outputSpec = 
OperatorSpecs.createSendToOperatorSpec(outputStream1, "test-output-3");
     testInput.registerNextOperatorSpec(filterOp);
     filterOp.registerNextOperatorSpec(outputSpec);
     String streamId2 = "test-input-2";
-    InputOperatorSpec testInput2 = new InputOperatorSpec(streamId2, new 
NoOpSerde(), new NoOpSerde(), true, "test-input-4");
+    InputOperatorSpec testInput2 = new InputOperatorSpec(streamId2, new 
NoOpSerde(), new NoOpSerde(), null, true, "test-input-4");
     StreamOperatorSpec testMap = OperatorSpecs.createMapOperatorSpec(m -> m, 
"test-map-5");
     SinkOperatorSpec testSink = OperatorSpecs.createSinkOperatorSpec((m, mc, 
tc) -> { }, "test-sink-6");
     testInput2.registerNextOperatorSpec(testMap);

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
index 109c138..9629efa 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
@@ -19,18 +19,30 @@
 package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableList;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import 
org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import 
org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.operators.functions.StreamExpander;
 import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
@@ -38,7 +50,9 @@ import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
@@ -47,6 +61,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("unchecked")
 public class TestStreamGraphSpec {
 
   @Test
@@ -55,13 +70,15 @@ public class TestStreamGraphSpec {
 
     String streamId = "test-stream-1";
     Serde mockValueSerde = mock(Serde.class);
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId, mockValueSerde);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, 
mockValueSerde);
+    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(isd);
 
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
+    InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
     assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
     assertEquals(streamId, inputOpSpec.getStreamId());
+    assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
@@ -76,93 +93,71 @@ public class TestStreamGraphSpec {
     Serde mockValueSerde = mock(Serde.class);
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId, mockKVSerde);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
+    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(isd);
 
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
+    InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
     assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
     assertEquals(streamId, inputOpSpec.getStreamId());
+    assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
     assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testGetInputStreamWithNullSerde() {
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    graphSpec.getInputStream("test-stream-1", null);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
+    graphSpec.getInputStream(isd);
   }
 
   @Test
-  public void testGetInputStreamWithDefaultValueSerde() {
+  public void testGetInputStreamWithTransformFunction() {
     String streamId = "test-stream-1";
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
-    graphSpec.setDefaultSerde(mockValueSerde);
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId);
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test
-  public void testGetInputStreamWithDefaultKeyValueSerde() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    graphSpec.setDefaultSerde(mockKVSerde);
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId);
+    InputTransformer transformer = ime -> ime;
+    MockTransformingSystemDescriptor sd = new 
MockTransformingSystemDescriptor("mockSystem", transformer);
+    MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
+    MessageStream inputStream = graphSpec.getInputStream(isd);
 
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
+    InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
     assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
     assertEquals(streamId, inputOpSpec.getStreamId());
-    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+    assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
+    assertEquals(transformer, inputOpSpec.getTransformer());
   }
 
   @Test
-  public void testGetInputStreamWithDefaultDefaultSerde() {
+  public void testGetInputStreamWithExpandingSystem() {
     String streamId = "test-stream-1";
-
-    // default default serde == user hasn't provided a default serde
+    String expandedStreamId = "expanded-stream";
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId);
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
+    AtomicInteger expandCallCount = new AtomicInteger();
+    StreamExpander expander = (sg, isd) -> {
+      expandCallCount.incrementAndGet();
+      InputDescriptor expandedISD =
+          new GenericSystemDescriptor("expanded-system", "mockFactoryClass")
+              .getInputDescriptor(expandedStreamId, new IntegerSerde());
+
+      return sg.getInputStream(expandedISD);
+    };
+    MockExpandingSystemDescriptor sd = new 
MockExpandingSystemDescriptor("mock-system", expander);
+    MockInputDescriptor isd = sd.getInputDescriptor(streamId, new 
IntegerSerde());
+    MessageStream inputStream = graphSpec.getInputStream(isd);
+    InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) 
inputStream).getOperatorSpec();
+    assertEquals(1, expandCallCount.get());
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
-    assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
-  }
-
-  @Test
-  public void testGetInputStreamWithRelaxedTypes() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId);
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
-    assertEquals(streamId, inputOpSpec.getStreamId());
+    assertEquals(inputOpSpec, 
graphSpec.getInputOperators().get(expandedStreamId));
+    assertFalse(graphSpec.getInputOperators().containsKey(streamId));
+    assertFalse(graphSpec.getInputDescriptors().containsKey(streamId));
+    assertTrue(graphSpec.getInputDescriptors().containsKey(expandedStreamId));
+    assertEquals(expandedStreamId, inputOpSpec.getStreamId());
   }
 
   @Test
@@ -171,12 +166,15 @@ public class TestStreamGraphSpec {
     String streamId2 = "test-stream-2";
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    MessageStream<Object> inputStream1 = graphSpec.getInputStream(streamId1);
-    MessageStream<Object> inputStream2 = graphSpec.getInputStream(streamId2);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, 
mock(Serde.class));
+    GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, 
mock(Serde.class));
+    MessageStream<Object> inputStream1 = graphSpec.getInputStream(isd1);
+    MessageStream<Object> inputStream2 = graphSpec.getInputStream(isd2);
 
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
+    InputOperatorSpec inputOpSpec1 =
         (InputOperatorSpec) ((MessageStreamImpl<Object>) 
inputStream1).getOperatorSpec();
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 =
+    InputOperatorSpec inputOpSpec2 =
         (InputOperatorSpec) ((MessageStreamImpl<Object>) 
inputStream2).getOperatorSpec();
 
     assertEquals(graphSpec.getInputOperators().size(), 2);
@@ -188,137 +186,127 @@ public class TestStreamGraphSpec {
   public void testGetSameInputStreamTwice() {
     String streamId = "test-stream-1";
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.getInputStream(streamId);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, 
mock(Serde.class));
+    GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, 
mock(Serde.class));
+    graphSpec.getInputStream(isd1);
     // should throw exception
-    graphSpec.getInputStream(streamId);
+    graphSpec.getInputStream(isd2);
   }
 
   @Test
-  public void testGetOutputStreamWithValueSerde() {
+  public void testMultipleSystemDescriptorForSameSystemName() {
     String streamId = "test-stream-1";
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericInputDescriptor isd1 = sd1.getInputDescriptor(streamId, 
mock(Serde.class));
+    GenericInputDescriptor isd2 = sd2.getInputDescriptor(streamId, 
mock(Serde.class));
+    GenericOutputDescriptor osd1 = sd2.getOutputDescriptor(streamId, 
mock(Serde.class));
+
+    graphSpec.getInputStream(isd1);
+    boolean passed = false;
+    try {
+      graphSpec.getInputStream(isd2);
+      passed = true;
+    } catch (IllegalStateException e) { }
 
-    Serde mockValueSerde = mock(Serde.class);
-    OutputStream<TestMessageEnvelope> outputStream =
-        graphSpec.getOutputStream(streamId, mockValueSerde);
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
-    assertEquals(streamId, outputStreamImpl.getStreamId());
-    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test
-  public void testGetOutputStreamWithKeyValueSerde() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    graphSpec.setDefaultSerde(mockKVSerde);
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(streamId, mockKVSerde);
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
-    assertEquals(streamId, outputStreamImpl.getStreamId());
-    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
+    try {
+      graphSpec.getOutputStream(osd1);
+      passed = true;
+    } catch (IllegalStateException e) { }
 
-  @Test(expected = NullPointerException.class)
-  public void testGetOutputStreamWithNullSerde() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    try {
+      graphSpec.setDefaultSystem(sd2);
+      passed = true;
+    } catch (IllegalStateException e) { }
 
-    graphSpec.getOutputStream(streamId, null);
+    assertFalse(passed);
   }
 
   @Test
-  public void testGetOutputStreamWithDefaultValueSerde() {
+  public void testGetOutputStreamWithValueSerde() {
     String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.setDefaultSerde(mockValueSerde);
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(streamId);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, 
mockValueSerde);
+    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(osd);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
     assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
     assertEquals(streamId, outputStreamImpl.getStreamId());
+    assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId));
     assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
-  public void testGetOutputStreamWithDefaultKeyValueSerde() {
+  public void testGetOutputStreamWithKeyValueSerde() {
     String streamId = "test-stream-1";
-
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
     Serde mockValueSerde = mock(Serde.class);
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    graphSpec.setDefaultSerde(mockKVSerde);
-
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(streamId);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, 
mockKVSerde);
+    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(osd);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
     assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
     assertEquals(streamId, outputStreamImpl.getStreamId());
+    assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId));
     assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
-  @Test
-  public void testGetOutputStreamWithDefaultDefaultSerde() {
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetOutputStreamWithNullSerde() {
     String streamId = "test-stream-1";
-
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(streamId);
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
-    assertEquals(streamId, outputStreamImpl.getStreamId());
-    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
-    assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
+    graphSpec.getOutputStream(osd);
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSerdeAfterGettingStreams() {
-    String streamId = "test-stream-1";
-
+  public void testSetDefaultSystemDescriptorAfterGettingInputStream() {
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.getInputStream(streamId);
-    graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
+    GenericInputDescriptor id = new GenericSystemDescriptor("system", 
"factory.class.name")
+        .getInputDescriptor("input-stream", mock(Serde.class));
+    graphSpec.getInputStream(id);
+    graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", 
"mockFactory")); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSerdeAfterGettingOutputStream() {
-    String streamId = "test-stream-1";
+  public void testSetDefaultSystemDescriptorAfterGettingOutputStream() {
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.getOutputStream(streamId);
-    graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
+    GenericOutputDescriptor od = new GenericSystemDescriptor("system", 
"factory.class.name")
+        .getOutputDescriptor("output-stream", mock(Serde.class));
+    graphSpec.getOutputStream(od);
+    graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", 
"mockFactory")); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
   public void testSetDefaultSerdeAfterGettingIntermediateStream() {
     String streamId = "test-stream-1";
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.getIntermediateStream(streamId, null);
-    graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
+    graphSpec.getIntermediateStream(streamId, mock(Serde.class), false);
+    graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", 
"mockFactory")); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameOutputStreamTwice() {
     String streamId = "test-stream-1";
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.getOutputStream(streamId);
-    graphSpec.getOutputStream(streamId); // should throw exception
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, 
mock(Serde.class));
+    GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, 
mock(Serde.class));
+    graphSpec.getOutputStream(osd1);
+    graphSpec.getOutputStream(osd2); // should throw exception
   }
 
   @Test
@@ -328,15 +316,15 @@ public class TestStreamGraphSpec {
 
     Serde mockValueSerde = mock(Serde.class);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(streamId, mockValueSerde);
+        graphSpec.getIntermediateStream(streamId, mockValueSerde, false);
 
     assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
     assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
     assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
     assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+    assertTrue(((InputOperatorSpec) (OperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
   }
 
   @Test
@@ -350,84 +338,40 @@ public class TestStreamGraphSpec {
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(streamId, mockKVSerde);
-
-    assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(streamId, intermediateStreamImpl.getStreamId());
-    assertEquals(mockKeySerde, 
intermediateStreamImpl.getOutputStream().getKeySerde());
-    assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertEquals(mockKeySerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
-    assertEquals(mockValueSerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithDefaultValueSerde() {
-    String streamId = "streamId";
-    StreamGraphSpec graph = new StreamGraphSpec(mock(Config.class));
-
-    Serde mockValueSerde = mock(Serde.class);
-    graph.setDefaultSerde(mockValueSerde);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream(streamId, null);
-
-    assertEquals(graph.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graph.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(streamId, intermediateStreamImpl.getStreamId());
-    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
-    assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
-    Config mockConfig = mock(Config.class);
-    String streamId = "streamId";
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    graphSpec.setDefaultSerde(mockKVSerde);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(streamId, null);
+        graphSpec.getIntermediateStream(streamId, mockKVSerde, false);
 
     assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
     assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
     assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertEquals(mockKeySerde, 
intermediateStreamImpl.getOutputStream().getKeySerde());
     assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertEquals(mockKeySerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
-    assertEquals(mockValueSerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+    assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec)  
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+    assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec)  
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
   }
 
   @Test
-  public void testGetIntermediateStreamWithDefaultDefaultSerde() {
+  public void testGetIntermediateStreamWithNoSerde() {
     Config mockConfig = mock(Config.class);
     String streamId = "streamId";
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(streamId, null);
+        graphSpec.getIntermediateStream(streamId, null, false);
 
     assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
     assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
     assertEquals(streamId, intermediateStreamImpl.getStreamId());
-    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
-    assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() 
instanceof NoOpSerde);
-    assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
-    assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
+    assertNull(intermediateStreamImpl.getOutputStream().getKeySerde());
+    assertNull(intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertNull(((InputOperatorSpec) (OperatorSpec)  
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+    assertNull(((InputOperatorSpec) (OperatorSpec)  
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
   }
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameIntermediateStreamTwice() {
     StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class));
-    graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class));
+    graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false);
+    graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false);
   }
 
   @Test
@@ -454,7 +398,7 @@ public class TestStreamGraphSpec {
   }
 
   @Test
-  public void testUserDefinedIdValidation() {
+  public void testIdValidation() {
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
@@ -471,7 +415,7 @@ public class TestStreamGraphSpec {
       fail("Received an error with a null or empty operator ID instead of 
defaulting to auto-generated ID.");
     }
 
-    List<String> validOpIds = ImmutableList.of("op.id", "op_id", "op-id", 
"1000", "op_1", "OP_ID");
+    List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", 
"op_1", "OP_ID");
     for (String validOpId: validOpIds) {
       try {
         graphSpec.getNextOpId(OpCode.FILTER, validOpId);
@@ -498,10 +442,10 @@ public class TestStreamGraphSpec {
     String testStreamId1 = "test-stream-1";
     String testStreamId2 = "test-stream-2";
     String testStreamId3 = "test-stream-3";
-
-    graphSpec.getInputStream("test-stream-1");
-    graphSpec.getInputStream("test-stream-2");
-    graphSpec.getInputStream("test-stream-3");
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
+    graphSpec.getInputStream(sd.getInputDescriptor(testStreamId1, 
mock(Serde.class)));
+    graphSpec.getInputStream(sd.getInputDescriptor(testStreamId2, 
mock(Serde.class)));
+    graphSpec.getInputStream(sd.getInputDescriptor(testStreamId3, 
mock(Serde.class)));
 
     List<InputOperatorSpec> inputSpecs = new 
ArrayList<>(graphSpec.getInputOperators().values());
     assertEquals(inputSpecs.size(), 3);
@@ -531,4 +475,32 @@ public class TestStreamGraphSpec {
     when(mockTableDescriptor.getTableId()).thenReturn("my.table");
     graphSpec.getTable(mockTableDescriptor);
   }
+
+  class MockExpandingSystemDescriptor extends 
SystemDescriptor<MockExpandingSystemDescriptor> implements 
ExpandingInputDescriptorProvider<Integer> {
+    public MockExpandingSystemDescriptor(String systemName, StreamExpander 
expander) {
+      super(systemName, "factory.class", null, expander);
+    }
+
+    @Override
+    public MockInputDescriptor<Integer> getInputDescriptor(String streamId, 
Serde serde) {
+      return new MockInputDescriptor<>(streamId, this, serde);
+    }
+  }
+
+  class MockTransformingSystemDescriptor extends 
SystemDescriptor<MockTransformingSystemDescriptor> implements 
TransformingInputDescriptorProvider<Integer> {
+    public MockTransformingSystemDescriptor(String systemName, 
InputTransformer transformer) {
+      super(systemName, "factory.class", transformer, null);
+    }
+
+    @Override
+    public MockInputDescriptor<Integer> getInputDescriptor(String streamId, 
Serde serde) {
+      return new MockInputDescriptor<>(streamId, this, serde);
+    }
+  }
+
+  public class MockInputDescriptor<StreamMessageType> extends 
InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> {
+    MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, 
Serde serde) {
+      super(streamId, serde, systemDescriptor, null);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
new file mode 100644
index 0000000..55a708b
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class TestInputOperatorImpl {
+  @Test
+  public void testWithKeyedInput() {
+    InputOperatorImpl inputOperator =
+        new InputOperatorImpl(new InputOperatorSpec("stream-id", null, null, 
null, true, "input-op-id"));
+
+    IncomingMessageEnvelope ime =
+        new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", 
"key", "msg");
+
+    Collection<Object> results =
+        inputOperator.handleMessage(ime, mock(MessageCollector.class), 
mock(TaskCoordinator.class));
+
+    Object result = results.iterator().next();
+    assertEquals("key", ((KV) result).getKey());
+    assertEquals("msg", ((KV) result).getValue());
+  }
+
+  @Test
+  public void testWithUnkeyedInput() {
+    InputOperatorImpl inputOperator =
+        new InputOperatorImpl(new InputOperatorSpec("stream-id", null, null, 
null, false, "input-op-id"));
+
+    IncomingMessageEnvelope ime =
+        new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", 
"key", "msg");
+
+    Collection<Object> results =
+        inputOperator.handleMessage(ime, mock(MessageCollector.class), 
mock(TaskCoordinator.class));
+
+    Object result = results.iterator().next();
+    assertEquals("msg", result);
+  }
+
+  @Test
+  public void testWithInputTransformer() {
+    InputOperatorSpec inputOpSpec =
+        new InputOperatorSpec("stream-id", null, null, 
IncomingMessageEnvelope::getOffset, true, "input-op-id");
+    InputOperatorImpl inputOperator = new InputOperatorImpl(inputOpSpec);
+
+    IncomingMessageEnvelope ime =
+        new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", 
"key", "msg");
+
+    Collection<Object> results =
+        inputOperator.handleMessage(ime, mock(MessageCollector.class), 
mock(TaskCoordinator.class));
+
+    Object result = results.iterator().next();
+    assertEquals("123", result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 4e77fae..604c72d 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -50,6 +50,9 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.ClosableFunction;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.InitableFunction;
@@ -59,10 +62,10 @@ import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
@@ -242,9 +245,11 @@ public class TestOperatorImplGraph {
     Config config = new MapConfig(configs);
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-
-    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputStreamId);
-    OutputStream<Object> outputStream = 
graphSpec.getOutputStream(outputStreamId);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+    GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+    GenericOutputDescriptor outputDescriptor = 
sd.getOutputDescriptor(outputStreamId, mock(Serde.class));
+    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputDescriptor);
+    OutputStream<Object> outputStream = 
graphSpec.getOutputStream(outputDescriptor);
 
     inputStream
         .filter(mock(FilterFunction.class))
@@ -293,9 +298,13 @@ public class TestOperatorImplGraph {
     Config config = new MapConfig(configs);
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputStreamId);
-    OutputStream<KV<Integer, String>> outputStream = graphSpec
-        .getOutputStream(outputStreamId, KVSerde.of(mock(IntegerSerde.class), 
mock(StringSerde.class)));
+    GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+    GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, 
"mockFactoryClass");
+    GenericInputDescriptor inputDescriptor = 
isd.getInputDescriptor(inputStreamId, mock(Serde.class));
+    GenericOutputDescriptor outputDescriptor = 
osd.getOutputDescriptor(outputStreamId,
+        KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputDescriptor);
+    OutputStream<KV<Integer, String>> outputStream = 
graphSpec.getOutputStream(outputDescriptor);
 
     inputStream
         .partitionBy(Object::hashCode, Object::toString,
@@ -337,12 +346,16 @@ public class TestOperatorImplGraph {
   @Test
   public void testBroadcastChain() {
     String inputStreamId = "input";
+    String inputSystem = "input-system";
+    String inputPhysicalName = "input-stream";
     HashMap<String, String> configMap = new HashMap<>();
-    StreamTestUtils.addStreamConfigs(configMap, "input", "input-system", 
"input-stream");
+    StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, 
inputPhysicalName);
     Config config = new MapConfig(configMap);
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
-    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputStreamId);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+    GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputDescriptor);
     inputStream.filter(mock(FilterFunction.class));
     inputStream.map(mock(MapFunction.class));
 
@@ -351,7 +364,7 @@ public class TestOperatorImplGraph {
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, 
mockTaskContext, mock(Clock.class));
 
-    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream"));
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream(inputSystem, inputPhysicalName));
     assertEquals(2, inputOpImpl.registeredOperators.size());
     assertTrue(inputOpImpl.registeredOperators.stream()
         .anyMatch(opImpl -> ((OperatorImpl) 
opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
@@ -362,9 +375,16 @@ public class TestOperatorImplGraph {
   @Test
   public void testMergeChain() {
     String inputStreamId = "input";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    String inputSystem = "input-system";
+    String inputPhysicalName = "input-stream";
+    HashMap<String, String> configs = new HashMap<>();
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, 
inputPhysicalName);
+    Config config = new MapConfig(configs);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
-    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputStreamId);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+    GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputDescriptor);
     MessageStream<Object> stream1 = 
inputStream.filter(mock(FilterFunction.class));
     MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
     MessageStream<Object> mergedStream = 
stream1.merge(Collections.singleton(stream2));
@@ -396,12 +416,14 @@ public class TestOperatorImplGraph {
   public void testJoinChain() {
     String inputStreamId1 = "input1";
     String inputStreamId2 = "input2";
-
+    String inputSystem = "input-system";
+    String inputPhysicalName1 = "input-stream1";
+    String inputPhysicalName2 = "input-stream2";
     HashMap<String, String> configs = new HashMap<>();
     configs.put(JobConfig.JOB_NAME(), "jobName");
     configs.put(JobConfig.JOB_ID(), "jobId");
-    StreamTestUtils.addStreamConfigs(configs, "input1", "input-system", 
"input-stream1");
-    StreamTestUtils.addStreamConfigs(configs, "input2", "input-system", 
"input-stream2");
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, 
inputPhysicalName1);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, 
inputPhysicalName2);
     Config config = new MapConfig(configs);
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
@@ -409,8 +431,13 @@ public class TestOperatorImplGraph {
     Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey;
     JoinFunction testJoinFunction = new 
TestJoinFunction("jobName-jobId-join-j1",
         (BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn);
-    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream(inputStreamId1, new NoOpSerde<>());
-    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream(inputStreamId2, new NoOpSerde<>());
+
+    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+    GenericInputDescriptor inputDescriptor1 = 
sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+    GenericInputDescriptor inputDescriptor2 = 
sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream(inputDescriptor1);
+    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream(inputDescriptor2);
+
     inputStream1.join(inputStream2, testJoinFunction,
         mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j1");
 
@@ -428,8 +455,8 @@ public class TestOperatorImplGraph {
     // verify that join function is initialized once.
     assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, 
"jobName-jobId-join-j1").numInitCalled, 1);
 
-    InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream1"));
-    InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream2"));
+    InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new 
SystemStream(inputSystem, inputPhysicalName1));
+    InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new 
SystemStream(inputSystem, inputPhysicalName2));
     PartialJoinOperatorImpl leftPartialJoinOpImpl =
         (PartialJoinOperatorImpl) 
inputOpImpl1.registeredOperators.iterator().next();
     PartialJoinOperatorImpl rightPartialJoinOpImpl =
@@ -442,12 +469,14 @@ public class TestOperatorImplGraph {
     Object mockLeftMessage = mock(Object.class);
     long currentTimeMillis = System.currentTimeMillis();
     when(mockLeftStore.get(eq(joinKey))).thenReturn(new 
TimestampedValue<>(mockLeftMessage, currentTimeMillis));
-    inputOpImpl1.onMessage(KV.of("", mockLeftMessage), 
mock(MessageCollector.class), mock(TaskCoordinator.class));
+    IncomingMessageEnvelope leftMessage = new 
IncomingMessageEnvelope(mock(SystemStreamPartition.class), "", "", 
mockLeftMessage);
+    inputOpImpl1.onMessage(leftMessage, mock(MessageCollector.class), 
mock(TaskCoordinator.class));
 
     // verify that right partial join operator calls getSecondKey
     Object mockRightMessage = mock(Object.class);
     when(mockRightStore.get(eq(joinKey))).thenReturn(new 
TimestampedValue<>(mockRightMessage, currentTimeMillis));
-    inputOpImpl2.onMessage(KV.of("", mockRightMessage), 
mock(MessageCollector.class), mock(TaskCoordinator.class));
+    IncomingMessageEnvelope rightMessage = new 
IncomingMessageEnvelope(mock(SystemStreamPartition.class), "", "", 
mockRightMessage);
+    inputOpImpl2.onMessage(rightMessage, mock(MessageCollector.class), 
mock(TaskCoordinator.class));
 
 
     // verify that the join function apply is called with the correct messages 
on match
@@ -461,6 +490,7 @@ public class TestOperatorImplGraph {
   public void testOperatorGraphInitAndClose() {
     String inputStreamId1 = "input1";
     String inputStreamId2 = "input2";
+    String inputSystem = "input-system";
     Config mockConfig = mock(Config.class);
     TaskName mockTaskName = mock(TaskName.class);
     TaskContextImpl mockContext = mock(TaskContextImpl.class);
@@ -468,8 +498,11 @@ public class TestOperatorImplGraph {
     when(mockContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
-    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream(inputStreamId1);
-    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream(inputStreamId2);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+    GenericInputDescriptor inputDescriptor1 = 
sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+    GenericInputDescriptor inputDescriptor2 = 
sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream(inputDescriptor1);
+    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream(inputDescriptor2);
 
     Function mapFn = (Function & Serializable) m -> m;
     inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn))
@@ -560,15 +593,22 @@ public class TestOperatorImplGraph {
     Config config = new MapConfig(configs);
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    MessageStream messageStream1 = 
graphSpec.getInputStream(inputStreamId1).map(m -> m);
-    MessageStream messageStream2 = 
graphSpec.getInputStream(inputStreamId2).filter(m -> true);
+    GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+    GenericInputDescriptor inputDescriptor1 = 
isd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+    GenericInputDescriptor inputDescriptor2 = 
isd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+    GenericInputDescriptor inputDescriptor3 = 
isd.getInputDescriptor(inputStreamId3, mock(Serde.class));
+    GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, 
"mockFactoryClass");
+    GenericOutputDescriptor outputDescriptor1 = 
osd.getOutputDescriptor(outputStreamId1, mock(Serde.class));
+    GenericOutputDescriptor outputDescriptor2 = 
osd.getOutputDescriptor(outputStreamId2, mock(Serde.class));
+    MessageStream messageStream1 = 
graphSpec.getInputStream(inputDescriptor1).map(m -> m);
+    MessageStream messageStream2 = 
graphSpec.getInputStream(inputDescriptor2).filter(m -> true);
     MessageStream messageStream3 =
-        graphSpec.getInputStream(inputStreamId3)
+        graphSpec.getInputStream(inputDescriptor3)
             .filter(m -> true)
             .partitionBy(m -> "m", m -> m, "p1")
             .map(m -> m);
-    OutputStream<Object> outputStream1 = 
graphSpec.getOutputStream(outputStreamId1);
-    OutputStream<Object> outputStream2 = 
graphSpec.getOutputStream(outputStreamId2);
+    OutputStream<Object> outputStream1 = 
graphSpec.getOutputStream(outputDescriptor1);
+    OutputStream<Object> outputStream2 = 
graphSpec.getOutputStream(outputDescriptor2);
 
     messageStream1
         .join(messageStream2, mock(JoinFunction.class),

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 0ef6680..6c34fcd 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -32,6 +32,8 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.impl.store.TestInMemoryStore;
 import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
@@ -547,9 +549,10 @@ public class TestWindowOperator {
   private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode 
mode,
       Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws 
IOException {
     StreamGraphSpec graph = new StreamGraphSpec(config);
-
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
-    graph.getInputStream("integers", kvSerde)
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
+    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
+    graph.getInputStream(inputDescriptor)
         .window(Windows.keyedTumblingWindow(KV::getKey, duration, new 
IntegerSerde(), kvSerde)
             .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1")
         .sink((message, messageCollector, taskCoordinator) -> {
@@ -563,9 +566,10 @@ public class TestWindowOperator {
   private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode,
       Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws 
IOException {
     StreamGraphSpec graph = new StreamGraphSpec(config);
-
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
-    graph.getInputStream("integers", kvSerde)
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
+    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
+    graph.getInputStream(inputDescriptor)
         .window(Windows.tumblingWindow(duration, 
kvSerde).setEarlyTrigger(earlyTrigger)
             .setAccumulationMode(mode), "w1")
         .sink((message, messageCollector, taskCoordinator) -> {
@@ -577,9 +581,10 @@ public class TestWindowOperator {
 
   private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode 
mode, Duration duration) throws IOException {
     StreamGraphSpec graph = new StreamGraphSpec(config);
-
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
-    graph.getInputStream("integers", kvSerde)
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
+    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
+    graph.getInputStream(inputDescriptor)
         .window(Windows.keyedSessionWindow(KV::getKey, duration, new 
IntegerSerde(), kvSerde)
             .setAccumulationMode(mode), "w1")
         .sink((message, messageCollector, taskCoordinator) -> {
@@ -592,9 +597,10 @@ public class TestWindowOperator {
   private StreamGraphSpec 
getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration 
timeDuration,
         Trigger<IntegerEnvelope> earlyTrigger) throws IOException {
     StreamGraphSpec graph = new StreamGraphSpec(config);
-
-    MessageStream<KV<Integer, Integer>> integers = 
graph.getInputStream("integers",
-        KVSerde.of(new IntegerSerde(), new IntegerSerde()));
+    KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
+    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
+    MessageStream<KV<Integer, Integer>> integers = 
graph.getInputStream(inputDescriptor);
 
     integers
         .map(new KVMapFunction())

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
index b27c944..a9ccd12 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
@@ -228,9 +228,9 @@ public class TestOperatorSpec {
       }
     };
 
-    InputOperatorSpec<String, Object> inputOperatorSpec = new 
InputOperatorSpec<>(
-        "mockStreamId", new StringSerde("UTF-8"), objSerde, true, "op0");
-    InputOperatorSpec<String, Object> inputOpCopy = (InputOperatorSpec<String, 
Object>) OperatorSpecTestUtils.copyOpSpec(inputOperatorSpec);
+    InputOperatorSpec inputOperatorSpec = new InputOperatorSpec(
+        "mockStreamId", new StringSerde("UTF-8"), objSerde, null, true, "op0");
+    InputOperatorSpec inputOpCopy = (InputOperatorSpec) 
OperatorSpecTestUtils.copyOpSpec(inputOperatorSpec);
 
     assertNotEquals("Expected deserialized copy of operator spec should not be 
the same as the original operator spec", inputOperatorSpec, inputOpCopy);
     assertTrue(inputOperatorSpec.isClone(inputOpCopy));
@@ -271,12 +271,10 @@ public class TestOperatorSpec {
   @Test
   public void testJoinOperatorSpec() {
 
-    InputOperatorSpec<TestMessageEnvelope, Object> leftOpSpec = new 
InputOperatorSpec<>(
-        "test-input-1", new NoOpSerde<>(),
-        new NoOpSerde<>(), false, "op0");
-    InputOperatorSpec<TestMessageEnvelope, Object> rightOpSpec = new 
InputOperatorSpec<>(
-        "test-input-2", new NoOpSerde<>(),
-        new NoOpSerde<>(), false, "op1");
+    InputOperatorSpec leftOpSpec = new InputOperatorSpec(
+        "test-input-1", new NoOpSerde<>(), new NoOpSerde<>(), null, false, 
"op0");
+    InputOperatorSpec rightOpSpec = new InputOperatorSpec(
+        "test-input-2", new NoOpSerde<>(), new NoOpSerde<>(), null, false, 
"op1");
 
     Serde<Object> objSerde = new Serde<Object>() {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
index 9bbcbfa..86a553f 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
@@ -26,16 +26,23 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -44,10 +51,12 @@ import static org.mockito.Mockito.*;
 public class TestPartitionByOperatorSpec {
 
   private final Config mockConfig = mock(Config.class);
-  private final String testInputId = "test-input-1";
+  private final GenericInputDescriptor testinputDescriptor =
+      new GenericSystemDescriptor("mockSystem", "mockFactoryClassName")
+          .getInputDescriptor("test-input-1", mock(Serde.class));
   private final String testJobName = "testJob";
   private final String testJobId = "1";
-  private final String testReparStreamName = "parByKey";
+  private final String testRepartitionedStreamName = "parByKey";
   private StreamGraphSpec graphSpec = null;
 
   class TimerMapFn implements MapFunction<Object, String>, 
TimerFunction<String, Object> {
@@ -95,13 +104,14 @@ public class TestPartitionByOperatorSpec {
 
   @Test
   public void testPartitionBy() {
-    MessageStream inputStream = graphSpec.getInputStream(testInputId);
+    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
     MapFunction<Object, String> keyFn = m -> m.toString();
     MapFunction<Object, Object> valueFn = m -> m;
+    KVSerde<Object, Object> partitionBySerde = KVSerde.of(new NoOpSerde<>(), 
new NoOpSerde<>());
     MessageStream<KV<String, Object>>
-        reparStream = inputStream.partitionBy(keyFn, valueFn, 
testReparStreamName);
+        reparStream = inputStream.partitionBy(keyFn, valueFn, 
partitionBySerde, testRepartitionedStreamName);
     InputOperatorSpec inputOpSpec = (InputOperatorSpec) 
Whitebox.getInternalState(reparStream, "operatorSpec");
-    assertEquals(inputOpSpec.getStreamId(), 
String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testReparStreamName));
+    assertEquals(inputOpSpec.getStreamId(), 
String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testRepartitionedStreamName));
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.isKeyed());
@@ -110,7 +120,32 @@ public class TestPartitionByOperatorSpec {
     InputOperatorSpec originInputSpec = (InputOperatorSpec) 
Whitebox.getInternalState(inputStream, "operatorSpec");
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] 
instanceof PartitionByOperatorSpec);
     PartitionByOperatorSpec reparOpSpec  = (PartitionByOperatorSpec) 
originInputSpec.getRegisteredOperatorSpecs().toArray()[0];
-    assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", 
testJobName, testJobId, testReparStreamName));
+    assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", 
testJobName, testJobId, testRepartitionedStreamName));
+    assertEquals(reparOpSpec.getKeyFunction(), keyFn);
+    assertEquals(reparOpSpec.getValueFunction(), valueFn);
+    assertEquals(reparOpSpec.getOutputStream().getStreamId(), 
reparOpSpec.getOpId());
+    assertNull(reparOpSpec.getTimerFn());
+    assertNull(reparOpSpec.getWatermarkFn());
+  }
+
+  @Test
+  public void testPartitionByWithNoSerde() {
+    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
+    MapFunction<Object, String> keyFn = m -> m.toString();
+    MapFunction<Object, Object> valueFn = m -> m;
+    MessageStream<KV<String, Object>>
+        reparStream = inputStream.partitionBy(keyFn, valueFn, 
testRepartitionedStreamName);
+    InputOperatorSpec inputOpSpec = (InputOperatorSpec) 
Whitebox.getInternalState(reparStream, "operatorSpec");
+    assertEquals(inputOpSpec.getStreamId(), 
String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testRepartitionedStreamName));
+    assertNull(inputOpSpec.getKeySerde());
+    assertNull(inputOpSpec.getValueSerde());
+    assertTrue(inputOpSpec.isKeyed());
+    assertNull(inputOpSpec.getTimerFn());
+    assertNull(inputOpSpec.getWatermarkFn());
+    InputOperatorSpec originInputSpec = (InputOperatorSpec) 
Whitebox.getInternalState(inputStream, "operatorSpec");
+    assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] 
instanceof PartitionByOperatorSpec);
+    PartitionByOperatorSpec reparOpSpec  = (PartitionByOperatorSpec) 
originInputSpec.getRegisteredOperatorSpecs().toArray()[0];
+    assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", 
testJobName, testJobId, testRepartitionedStreamName));
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
     assertEquals(reparOpSpec.getOutputStream().getStreamId(), 
reparOpSpec.getOpId());
@@ -120,8 +155,8 @@ public class TestPartitionByOperatorSpec {
 
   @Test
   public void testCopy() {
-    MessageStream inputStream = graphSpec.getInputStream(testInputId);
-    inputStream.partitionBy(m -> m.toString(), m -> m, testReparStreamName);
+    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
+    inputStream.partitionBy(m -> m.toString(), m -> m, 
testRepartitionedStreamName);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
     OperatorSpecGraph clonedGraph = specGraph.clone();
     OperatorSpecTestUtils.assertClonedGraph(specGraph, clonedGraph);
@@ -130,28 +165,28 @@ public class TestPartitionByOperatorSpec {
   @Test(expected = IllegalArgumentException.class)
   public void testTimerFunctionAsKeyFn() {
     TimerMapFn keyFn = new TimerMapFn();
-    MessageStream<Object> inputStream = graphSpec.getInputStream(testInputId);
+    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
     inputStream.partitionBy(keyFn, m -> m, "parByKey");
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testWatermarkFunctionAsKeyFn() {
     WatermarkMapFn keyFn = new WatermarkMapFn();
-    MessageStream<Object> inputStream = graphSpec.getInputStream(testInputId);
+    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
     inputStream.partitionBy(keyFn, m -> m, "parByKey");
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testTimerFunctionAsValueFn() {
     TimerMapFn valueFn = new TimerMapFn();
-    MessageStream<Object> inputStream = graphSpec.getInputStream(testInputId);
+    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
     inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testWatermarkFunctionAsValueFn() {
     WatermarkMapFn valueFn = new WatermarkMapFn();
-    MessageStream<Object> inputStream = graphSpec.getInputStream(testInputId);
+    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
     inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index f0b8a17..ba3b5df 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -22,8 +22,6 @@ package org.apache.samza.util
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.config.MapConfig
-import org.apache.samza.serializers._
-import org.apache.samza.SamzaException
 
 class TestUtil {
   @Test

Reply via email to