http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java
new file mode 100644
index 0000000..d8b51c5
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+import java.util.Collections;
+import 
org.apache.samza.operators.descriptors.serde.ExampleSimpleInputDescriptor;
+import 
org.apache.samza.operators.descriptors.serde.ExampleSimpleOutputDescriptor;
+import 
org.apache.samza.operators.descriptors.serde.ExampleSimpleSystemDescriptor;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestSimpleInputDescriptor {
+  @Test
+  public void testAPIUsage() {
+    // does not assert anything, but acts as a compile-time check on expected 
descriptor type parameters
+    // and validates that the method calls can be chained.
+    ExampleSimpleSystemDescriptor kafkaSystem =
+        new ExampleSimpleSystemDescriptor("kafka-system")
+            .withSystemConfigs(Collections.emptyMap());
+    ExampleSimpleInputDescriptor<Integer> input1 = 
kafkaSystem.getInputDescriptor("input1", new IntegerSerde());
+    ExampleSimpleOutputDescriptor<Integer> output1 = 
kafkaSystem.getOutputDescriptor("output1", new IntegerSerde());
+
+    input1
+        .withBootstrap(false)
+        .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST)
+        .withPhysicalName("input-1")
+        .withPriority(1)
+        .withResetOffset(false)
+        .withStreamConfigs(Collections.emptyMap());
+
+    output1
+        .withPhysicalName("output-1")
+        .withStreamConfigs(Collections.emptyMap());
+  }
+
+  @Test
+  public void testISDObjectsWithOverrides() {
+    ExampleSimpleSystemDescriptor ssd = new 
ExampleSimpleSystemDescriptor("kafka-system");
+    IntegerSerde streamSerde = new IntegerSerde();
+    ExampleSimpleInputDescriptor<Integer> isd = 
ssd.getInputDescriptor("input-stream", streamSerde);
+
+    assertEquals(streamSerde, isd.getSerde());
+    assertFalse(isd.getTransformer().isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java
new file mode 100644
index 0000000..60e4819
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+import java.util.Collections;
+import 
org.apache.samza.operators.descriptors.transforming.ExampleTransformingInputDescriptor;
+import 
org.apache.samza.operators.descriptors.transforming.ExampleTransformingOutputDescriptor;
+import 
org.apache.samza.operators.descriptors.transforming.ExampleTransformingSystemDescriptor;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTransformingInputDescriptor {
+  @Test
+  public void testAPIUsage() {
+    // does not assert anything, but acts as a compile-time check on expected 
descriptor type parameters
+    // and validates that the method calls can be chained.
+    ExampleTransformingSystemDescriptor imeTransformingSystem =
+        new ExampleTransformingSystemDescriptor("imeTransformingSystem")
+            .withSystemConfigs(Collections.emptyMap());
+    ExampleTransformingInputDescriptor<Long> input1 = 
imeTransformingSystem.getInputDescriptor("input1", new IntegerSerde());
+    ExampleTransformingOutputDescriptor<Integer> output1 = 
imeTransformingSystem.getOutputDescriptor("output1", new IntegerSerde());
+
+    input1
+        .withBootstrap(false)
+        .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST)
+        .withPhysicalName("input-1")
+        .withPriority(1)
+        .withResetOffset(false)
+        .withStreamConfigs(Collections.emptyMap());
+
+    output1
+        .withPhysicalName("output-1")
+        .withStreamConfigs(Collections.emptyMap());
+  }
+
+  @Test
+  public void testISDObjectsWithOverrides() {
+    ExampleTransformingSystemDescriptor imeTransformingSystem =
+        new ExampleTransformingSystemDescriptor("imeTransformingSystem");
+    IntegerSerde streamSerde = new IntegerSerde();
+    ExampleTransformingInputDescriptor<Long> overridingISD =
+        imeTransformingSystem.getInputDescriptor("input-stream", streamSerde);
+
+    assertEquals(streamSerde, overridingISD.getSerde());
+    assertEquals(imeTransformingSystem.getTransformer().get(), 
overridingISD.getTransformer().get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingInputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingInputDescriptor.java
new file mode 100644
index 0000000..1f382a3
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingInputDescriptor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.expanding;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.serializers.Serde;
+
+public class ExampleExpandingInputDescriptor<StreamMessageType> extends 
InputDescriptor<StreamMessageType, 
ExampleExpandingInputDescriptor<StreamMessageType>> {
+  ExampleExpandingInputDescriptor(String streamId, SystemDescriptor 
systemDescriptor, InputTransformer<StreamMessageType> transformer, Serde serde) 
{
+    super(streamId, serde, systemDescriptor, transformer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingOutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingOutputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingOutputDescriptor.java
new file mode 100644
index 0000000..705b866
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingOutputDescriptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.expanding;
+
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+public class ExampleExpandingOutputDescriptor<StreamMessageType> extends 
OutputDescriptor<StreamMessageType, 
ExampleExpandingOutputDescriptor<StreamMessageType>> {
+  ExampleExpandingOutputDescriptor(String streamId, SystemDescriptor 
systemDescriptor, Serde<StreamMessageType> serde) {
+    super(streamId, serde, systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingSystemDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingSystemDescriptor.java
new file mode 100644
index 0000000..c81c8aa
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingSystemDescriptor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.expanding;
+
+import org.apache.samza.operators.MessageStream;
+import 
org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider;
+import 
org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+public class ExampleExpandingSystemDescriptor extends 
SystemDescriptor<ExampleExpandingSystemDescriptor>
+    implements ExpandingInputDescriptorProvider<Long>, 
OutputDescriptorProvider {
+  private static final String FACTORY_CLASS_NAME = 
"org.apache.samza.GraphExpandingSystemFactory";
+
+  public ExampleExpandingSystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME,
+        (InputTransformer<String>) IncomingMessageEnvelope::toString,
+        (streamGraph, inputDescriptor) -> (MessageStream<Long>) 
streamGraph.getInputStream(inputDescriptor)
+    );
+  }
+
+  @Override
+  public ExampleExpandingInputDescriptor<Long> getInputDescriptor(String 
streamId, Serde serde) {
+    return new ExampleExpandingInputDescriptor<>(streamId, this, null, serde);
+  }
+
+  @Override
+  public <StreamMessageType> 
ExampleExpandingOutputDescriptor<StreamMessageType> getOutputDescriptor(String 
streamId, Serde<StreamMessageType> serde) {
+    return new ExampleExpandingOutputDescriptor<>(streamId, this, serde);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleInputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleInputDescriptor.java
new file mode 100644
index 0000000..c5df448
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleInputDescriptor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.serde;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.serializers.Serde;
+
+public class ExampleSimpleInputDescriptor<StreamMessageType> extends 
InputDescriptor<StreamMessageType, 
ExampleSimpleInputDescriptor<StreamMessageType>> {
+  ExampleSimpleInputDescriptor(String streamId, SystemDescriptor 
systemDescriptor, InputTransformer<StreamMessageType> transformer, Serde serde) 
{
+    super(streamId, serde, systemDescriptor, transformer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleOutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleOutputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleOutputDescriptor.java
new file mode 100644
index 0000000..aeda2e0
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleOutputDescriptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.serde;
+
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+public class ExampleSimpleOutputDescriptor<StreamMessageType> extends 
OutputDescriptor<StreamMessageType, 
ExampleSimpleOutputDescriptor<StreamMessageType>> {
+  ExampleSimpleOutputDescriptor(String streamId, SystemDescriptor 
systemDescriptor, Serde<StreamMessageType> serde) {
+    super(streamId, serde, systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleSystemDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleSystemDescriptor.java
new file mode 100644
index 0000000..b0af4d8
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleSystemDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.serde;
+
+import 
org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
+import 
org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+public class ExampleSimpleSystemDescriptor extends 
SystemDescriptor<ExampleSimpleSystemDescriptor>
+    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+  private static final String FACTORY_CLASS_NAME = 
"org.apache.kafka.KafkaSystemFactory";
+
+  public ExampleSimpleSystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, null, null);
+  }
+
+  @Override
+  public <StreamMessageType> ExampleSimpleInputDescriptor<StreamMessageType> 
getInputDescriptor(String streamId, Serde<StreamMessageType> serde) {
+    return new ExampleSimpleInputDescriptor<>(streamId, this, null, serde);
+  }
+
+  @Override
+  public <StreamMessageType> ExampleSimpleOutputDescriptor<StreamMessageType> 
getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) {
+    return new ExampleSimpleOutputDescriptor<>(streamId, this, serde);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingInputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingInputDescriptor.java
new file mode 100644
index 0000000..78b6f33
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingInputDescriptor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.transforming;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.serializers.Serde;
+
+public class ExampleTransformingInputDescriptor<StreamMessageType> extends 
InputDescriptor<StreamMessageType, 
ExampleTransformingInputDescriptor<StreamMessageType>> {
+  ExampleTransformingInputDescriptor(String streamId, SystemDescriptor 
systemDescriptor, InputTransformer<StreamMessageType> transformer, Serde serde) 
{
+    super(streamId, serde, systemDescriptor, transformer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingOutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingOutputDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingOutputDescriptor.java
new file mode 100644
index 0000000..c37906b
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingOutputDescriptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.transforming;
+
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+public class ExampleTransformingOutputDescriptor<StreamMessageType> extends 
OutputDescriptor<StreamMessageType, 
ExampleTransformingOutputDescriptor<StreamMessageType>> {
+  ExampleTransformingOutputDescriptor(String streamId, SystemDescriptor 
systemDescriptor, Serde<StreamMessageType> serde) {
+    super(streamId, serde, systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingSystemDescriptor.java
 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingSystemDescriptor.java
new file mode 100644
index 0000000..f8aff61
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingSystemDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors.transforming;
+
+import 
org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import 
org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider;
+import org.apache.samza.serializers.Serde;
+
+public class ExampleTransformingSystemDescriptor extends 
SystemDescriptor<ExampleTransformingSystemDescriptor>
+    implements TransformingInputDescriptorProvider<Long>, 
OutputDescriptorProvider {
+  private static final String FACTORY_CLASS_NAME = 
"org.apache.samza.IMETransformingSystemFactory";
+
+  public ExampleTransformingSystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, ime -> 1L, null);
+  }
+
+  @Override
+  public ExampleTransformingInputDescriptor<Long> getInputDescriptor(String 
streamId, Serde serde) {
+    return new ExampleTransformingInputDescriptor<>(streamId, this, null, 
serde);
+  }
+
+  @Override
+  public <StreamMessageType> 
ExampleTransformingOutputDescriptor<StreamMessageType> 
getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) {
+    return new ExampleTransformingOutputDescriptor<>(streamId, this, serde);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 4f2aa23..3a8d5c9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -174,8 +174,8 @@ import org.codehaus.jackson.map.ObjectMapper;
         StreamJson inputJson = new StreamJson();
         opGraph.inputStreams.add(inputJson);
         inputJson.streamId = streamId;
-        Collection<OperatorSpec> specs = 
operatorSpec.getRegisteredOperatorSpecs();
-        inputJson.nextOperatorIds = 
specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet());
+        inputJson.nextOperatorIds = 
operatorSpec.getRegisteredOperatorSpecs().stream()
+            .map(OperatorSpec::getOpId).collect(Collectors.toSet());
 
         updateOperatorGraphJson(operatorSpec, opGraph);
       });

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 2b279ef..9be6c37 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -235,15 +235,27 @@ public class JobNode {
     inEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
         InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
-        streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
-        streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
+        Serde keySerde = inputOperatorSpec.getKeySerde();
+        if (keySerde != null) {
+          streamKeySerdes.put(streamId, keySerde);
+        }
+        Serde valueSerde = inputOperatorSpec.getValueSerde();
+        if (valueSerde != null) {
+          streamMsgSerdes.put(streamId, valueSerde);
+        }
       });
     Map<String, OutputStreamImpl> outputStreams = specGraph.getOutputStreams();
     outEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
         OutputStreamImpl outputStream = outputStreams.get(streamId);
-        streamKeySerdes.put(streamId, outputStream.getKeySerde());
-        streamMsgSerdes.put(streamId, outputStream.getValueSerde());
+        Serde keySerde = outputStream.getKeySerde();
+        if (keySerde != null) {
+          streamKeySerdes.put(streamId, keySerde);
+        }
+        Serde valueSerde = outputStream.getValueSerde();
+        if (valueSerde != null) {
+          streamMsgSerdes.put(streamId, valueSerde);
+        }
       });
 
     // collect all key and msg serde instances for stores

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 6922c76..3c1a1dc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -166,7 +166,7 @@ public class MessageStreamImpl<M> implements 
MessageStream<M> {
   public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? 
extends K> keyExtractor,
       MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, 
String userDefinedId) {
     String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
-    IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = 
this.graph.getIntermediateStream(opId, serde);
+    IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = 
this.graph.getIntermediateStream(opId, serde, false);
     if (!intermediateStream.isKeyed()) {
       // this can only happen when the default serde partitionBy variant is 
being used
       throw new SamzaException("partitionBy can not be used with a default 
serde that is not a KVSerde.");

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
index 7dcd32e..8eb2425 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
@@ -18,17 +18,25 @@
  */
 package org.apache.samza.operators;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.operators.functions.StreamExpander;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OperatorSpecs;
@@ -42,9 +50,6 @@ import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
 /**
  * This class defines:
  * 1) an implementation of {@link StreamGraph} that provides APIs for 
accessing {@link MessageStream}s to be used to
@@ -53,12 +58,14 @@ import com.google.common.base.Preconditions;
  */
 public class StreamGraphSpec implements StreamGraph {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamGraphSpec.class);
-  public static final Pattern STREAM_ID_PATTERN = 
Pattern.compile("[\\d\\w-_.]+");
-  public static final Pattern TABLE_ID_PATTERN = 
Pattern.compile("[\\d\\w-_]+");
+  private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
 
   // We use a LHM for deterministic order in initializing and closing 
operators.
   private final Map<String, InputOperatorSpec> inputOperators = new 
LinkedHashMap<>();
   private final Map<String, OutputStreamImpl> outputStreams = new 
LinkedHashMap<>();
+  private final Map<String, InputDescriptor> inputDescriptors = new 
LinkedHashMap<>();
+  private final Map<String, OutputDescriptor> outputDescriptors = new 
LinkedHashMap<>();
+  private final Map<String, SystemDescriptor> systemDescriptors = new 
LinkedHashMap<>();
   private final Set<String> broadcastStreams = new HashSet<>();
   private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
   private final Config config;
@@ -70,29 +77,41 @@ public class StreamGraphSpec implements StreamGraph {
    */
   private int nextOpNum = 0;
   private final Set<String> operatorIds = new HashSet<>();
-  private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new 
NoOpSerde());
   private ContextManager contextManager = null;
+  private Optional<SystemDescriptor> defaultSystemDescriptorOptional = 
Optional.empty();
 
   public StreamGraphSpec(Config config) {
     this.config = config;
   }
 
   @Override
-  public void setDefaultSerde(Serde<?> serde) {
-    Preconditions.checkNotNull(serde, "Default serde must not be null");
+  public void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
+    Preconditions.checkNotNull(defaultSystemDescriptor, "Provided 
defaultSystemDescriptor must not be null.");
+    String defaultSystemName = defaultSystemDescriptor.getSystemName();
     Preconditions.checkState(inputOperators.isEmpty() && 
outputStreams.isEmpty(),
-        "Default serde must be set before creating any input or output 
streams.");
-    this.defaultSerde = serde;
+        "Default system must be set before creating any input or output 
streams.");
+    checkSystemDescriptorUniqueness(defaultSystemDescriptor, 
defaultSystemName);
+    systemDescriptors.put(defaultSystemName, defaultSystemDescriptor);
+    this.defaultSystemDescriptorOptional = 
Optional.of(defaultSystemDescriptor);
   }
 
   @Override
-  public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
-    Preconditions.checkState(isValidStreamId(streamId), String.format(
-        "streamId %s doesn't confirm to pattern %s", streamId, 
StreamGraphSpec.STREAM_ID_PATTERN));
-    Preconditions.checkNotNull(serde, "serde must not be null for an input 
stream.");
+  public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> 
inputDescriptor) {
+    SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
+    Optional<StreamExpander> expander = systemDescriptor.getExpander();
+    if (expander.isPresent()) {
+      return expander.get().apply(this, inputDescriptor);
+    }
+
+    String streamId = inputDescriptor.getStreamId();
     Preconditions.checkState(!inputOperators.containsKey(streamId),
         "getInputStream must not be called multiple times with the same 
streamId: " + streamId);
+    Preconditions.checkState(!inputDescriptors.containsKey(streamId),
+        "getInputStream must not be called multiple times with the same input 
descriptor: " + streamId);
+    String systemName = systemDescriptor.getSystemName();
+    checkSystemDescriptorUniqueness(systemDescriptor, systemName);
 
+    Serde serde = inputDescriptor.getSerde();
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
     if (outputStreams.containsKey(streamId)) {
       OutputStreamImpl outputStream = outputStreams.get(streamId);
@@ -104,26 +123,28 @@ public class StreamGraphSpec implements StreamGraph {
     }
 
     boolean isKeyed = serde instanceof KVSerde;
+    InputTransformer transformer = 
inputDescriptor.getTransformer().orElse(null);
     InputOperatorSpec inputOperatorSpec =
         OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), 
kvSerdes.getValue(),
-            isKeyed, this.getNextOpId(OpCode.INPUT, null));
+            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
     inputOperators.put(streamId, inputOperatorSpec);
-    return new MessageStreamImpl<>(this, inputOperators.get(streamId));
+    inputDescriptors.put(streamId, inputDescriptor);
+    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+    return new MessageStreamImpl(this, inputOperators.get(streamId));
   }
 
   @Override
-  public <M> MessageStream<M> getInputStream(String streamId) {
-    return (MessageStream<M>) getInputStream(streamId, defaultSerde);
-  }
-
-  @Override
-  public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
-    Preconditions.checkState(isValidStreamId(streamId), String.format(
-        "streamId %s doesn't confirm to pattern %s", streamId, 
StreamGraphSpec.STREAM_ID_PATTERN));
-    Preconditions.checkNotNull(serde, "serde must not be null for an output 
stream.");
+  public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> 
outputDescriptor) {
+    String streamId = outputDescriptor.getStreamId();
     Preconditions.checkState(!outputStreams.containsKey(streamId),
         "getOutputStream must not be called multiple times with the same 
streamId: " + streamId);
+    Preconditions.checkState(!outputDescriptors.containsKey(streamId),
+        "getOutputStream must not be called multiple times with the same 
output descriptor: " + streamId);
+    SystemDescriptor systemDescriptor = outputDescriptor.getSystemDescriptor();
+    String systemName = systemDescriptor.getSystemName();
+    checkSystemDescriptorUniqueness(systemDescriptor, systemName);
 
+    Serde serde = outputDescriptor.getSerde();
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
     if (inputOperators.containsKey(streamId)) {
       InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
@@ -136,21 +157,21 @@ public class StreamGraphSpec implements StreamGraph {
 
     boolean isKeyed = serde instanceof KVSerde;
     outputStreams.put(streamId, new OutputStreamImpl<>(streamId, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    outputDescriptors.put(streamId, outputDescriptor);
+    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
     return outputStreams.get(streamId);
   }
 
   @Override
-  public <M> OutputStream<M> getOutputStream(String streamId) {
-    return (OutputStream<M>) getOutputStream(streamId, defaultSerde);
-  }
-
-  @Override
-  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
-    Preconditions.checkState(isValidTableId(tableDesc.getTableId()), 
String.format(
-        "tableId %s doesn't confirm to pattern %s", tableDesc.getTableId(), 
TABLE_ID_PATTERN.toString()));
-    TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
-    Preconditions.checkState(!tables.containsKey(tableSpec), String.format(
-        "getTable() invoked multiple times with the same tableId: %s", 
tableDesc.getTableId()));
+  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> 
tableDescriptor) {
+    String tableId = tableDescriptor.getTableId();
+    Preconditions.checkState(StringUtils.isNotBlank(tableId) && 
ID_PATTERN.matcher(tableId).matches(),
+        String.format("tableId: %s must confirm to pattern: %s", tableId, 
ID_PATTERN.toString()));
+    TableSpec tableSpec = ((BaseTableDescriptor) 
tableDescriptor).getTableSpec();
+    if (tables.containsKey(tableSpec)) {
+      throw new IllegalStateException(
+          String.format("getTable() invoked multiple times with the same 
tableId: %s", tableId));
+    }
     tables.put(tableSpec, new TableImpl(tableSpec));
     return tables.get(tableSpec);
   }
@@ -178,7 +199,7 @@ public class StreamGraphSpec implements StreamGraph {
    * @return the unique ID for the next operator in the graph
    */
   public String getNextOpId(OpCode opCode, String userDefinedId) {
-    if (StringUtils.isNotBlank(userDefinedId) && 
!STREAM_ID_PATTERN.matcher(userDefinedId).matches()) {
+    if (StringUtils.isNotBlank(userDefinedId) && 
!ID_PATTERN.matcher(userDefinedId).matches()) {
       throw new SamzaException("Operator ID must not contain spaces or special 
characters: " + userDefinedId);
     }
 
@@ -207,19 +228,6 @@ public class StreamGraphSpec implements StreamGraph {
   }
 
   /**
-   * See {@link StreamGraphSpec#getIntermediateStream(String, Serde, boolean)}.
-   *
-   * @param <M> type of messages in the intermediate stream
-   * @param streamId the id of the stream to be created
-   * @param serde the {@link Serde} to use for messages in the intermediate 
stream. If null, the default serde is used.
-   * @return  the intermediate {@link MessageStreamImpl}
-   */
-  @VisibleForTesting
-  public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String 
streamId, Serde<M> serde) {
-    return getIntermediateStream(streamId, serde, false);
-  }
-
-  /**
    * Internal helper for {@link MessageStreamImpl} to add an intermediate 
{@link MessageStream} to the graph.
    * An intermediate {@link MessageStream} is both an output and an input 
stream.
    *
@@ -231,21 +239,33 @@ public class StreamGraphSpec implements StreamGraph {
    * @return  the intermediate {@link MessageStreamImpl}
    */
   @VisibleForTesting
-  <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, 
Serde<M> serde, boolean isBroadcast) {
+  public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String 
streamId, Serde<M> serde, boolean isBroadcast) {
     Preconditions.checkState(!inputOperators.containsKey(streamId) && 
!outputStreams.containsKey(streamId),
         "getIntermediateStream must not be called multiple times with the same 
streamId: " + streamId);
 
     if (serde == null) {
-      LOGGER.info("Using default serde for intermediate stream: " + streamId);
-      serde = (Serde<M>) defaultSerde;
+      LOGGER.info("No serde provided for intermediate stream: " + streamId +
+          ". Key and message serdes configured for the job.default.system will 
be used.");
     }
 
     if (isBroadcast) broadcastStreams.add(streamId);
-    boolean isKeyed = serde instanceof KVSerde;
-    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+
+    boolean isKeyed;
+    KV<Serde, Serde> kvSerdes;
+    if (serde == null) { // if no explicit serde available
+      isKeyed = true; // assume keyed stream
+      kvSerdes = new KV<>(null, null); // and that key and msg serdes are 
provided for job.default.system in configs
+    } else {
+      isKeyed = serde instanceof KVSerde;
+      kvSerdes = getKVSerdes(streamId, serde);
+    }
+
+    InputTransformer transformer = (InputTransformer) 
defaultSystemDescriptorOptional
+        .flatMap(SystemDescriptor::getTransformer).orElse(null);
+
     InputOperatorSpec inputOperatorSpec =
         OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), 
kvSerdes.getValue(),
-            isKeyed, this.getNextOpId(OpCode.INPUT, null));
+            transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
     inputOperators.put(streamId, inputOperatorSpec);
     outputStreams.put(streamId, new OutputStreamImpl(streamId, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
     return new IntermediateMessageStreamImpl<>(this, 
inputOperators.get(streamId), outputStreams.get(streamId));
@@ -267,12 +287,28 @@ public class StreamGraphSpec implements StreamGraph {
     return Collections.unmodifiableMap(tables);
   }
 
-  public static boolean isValidStreamId(String id) {
-    return StringUtils.isNotBlank(id) && 
STREAM_ID_PATTERN.matcher(id).matches();
+  public Map<String, InputDescriptor> getInputDescriptors() {
+    return Collections.unmodifiableMap(inputDescriptors);
+  }
+
+  public Map<String, OutputDescriptor> getOutputDescriptors() {
+    return Collections.unmodifiableMap(outputDescriptors);
+  }
+
+  public Set<SystemDescriptor> getSystemDescriptors() {
+    // We enforce that users must not use different system descriptor 
instances for the same system name
+    // when getting an input/output stream or setting the default system 
descriptor
+    return Collections.unmodifiableSet(new 
HashSet<>(systemDescriptors.values()));
+  }
+
+  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+    return this.defaultSystemDescriptorOptional;
   }
 
-  public static boolean isValidTableId(String id) {
-    return StringUtils.isNotBlank(id) && 
TABLE_ID_PATTERN.matcher(id).matches();
+  private void checkSystemDescriptorUniqueness(SystemDescriptor 
systemDescriptor, String systemName) {
+    Preconditions.checkState(!systemDescriptors.containsKey(systemName)
+            || systemDescriptors.get(systemName) == systemDescriptor,
+        "Must not use different system descriptor instances for the same 
system name: " + systemName);
   }
 
   private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
 
b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
new file mode 100644
index 0000000..16accae
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.descriptors;
+
+
+import com.google.common.annotations.VisibleForTesting;
+
+import 
org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
+import 
org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for samza framework internal usage.
+ * <p>
+ * Allows creating a {@link SystemDescriptor} without setting the factory 
class name, and delegating
+ * rest of the system customization to configurations.
+ * <p>
+ * Useful for code-generation and testing use cases where the factory name is 
not known in advance.
+ */
+@SuppressWarnings("unchecked")
+public final class DelegatingSystemDescriptor extends 
SystemDescriptor<DelegatingSystemDescriptor>
+    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+
+  /**
+   * Constructs an {@link DelegatingSystemDescriptor} instance with no system 
level serde.
+   * Serdes must be provided explicitly at stream level when getting input or 
output descriptors.
+   * SystemFactory class name must be provided in configuration.
+   *
+   * @param systemName name of this system
+   */
+  @VisibleForTesting
+  public DelegatingSystemDescriptor(String systemName) {
+    super(systemName, null, null, null);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <StreamMessageType> GenericInputDescriptor<StreamMessageType> 
getInputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new GenericInputDescriptor<>(streamId, this, serde);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> 
getOutputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new GenericOutputDescriptor<>(streamId, this, serde);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
index ef3c322..6cc57e0 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -20,8 +20,10 @@ package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.InputTransformer;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -32,15 +34,12 @@ import java.util.Collections;
 
 /**
  * An operator that builds the input message from the incoming message.
- *
- * @param <K> the type of key in the incoming message
- * @param <V> the type of message in the incoming message
  */
-public final class InputOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, 
Object> { // Object == KV<K,V> | V
+public final class InputOperatorImpl extends 
OperatorImpl<IncomingMessageEnvelope, Object> {
 
-  private final InputOperatorSpec<K, V> inputOpSpec;
+  private final InputOperatorSpec inputOpSpec;
 
-  InputOperatorImpl(InputOperatorSpec<K, V> inputOpSpec) {
+  InputOperatorImpl(InputOperatorSpec inputOpSpec) {
     this.inputOpSpec = inputOpSpec;
   }
 
@@ -49,8 +48,14 @@ public final class InputOperatorImpl<K, V> extends 
OperatorImpl<KV<K, V>, Object
   }
 
   @Override
-  public Collection<Object> handleMessage(KV<K, V> pair, MessageCollector 
collector, TaskCoordinator coordinator) {
-    Object message = this.inputOpSpec.isKeyed() ? pair : pair.getValue();
+  public Collection<Object> handleMessage(IncomingMessageEnvelope ime, 
MessageCollector collector, TaskCoordinator coordinator) {
+    Object message;
+    InputTransformer transformer = inputOpSpec.getTransformer();
+    if (transformer != null) {
+      message = transformer.apply(ime);
+    } else {
+      message = this.inputOpSpec.isKeyed() ? KV.of(ime.getKey(), 
ime.getMessage()) : ime.getMessage();
+    }
     return Collections.singletonList(message);
   }
 
@@ -58,7 +63,7 @@ public final class InputOperatorImpl<K, V> extends 
OperatorImpl<KV<K, V>, Object
   protected void handleClose() {
   }
 
-  protected OperatorSpec<KV<K, V>, Object> getOperatorSpec() {
+  protected OperatorSpec<IncomingMessageEnvelope, Object> getOperatorSpec() {
     return this.inputOpSpec;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 922a1f9..c49443d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -18,47 +18,65 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.InputTransformer;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
 
 /**
  * The spec for an operator that receives incoming messages from an input 
stream
- * and converts them to the input message.
- *
- * @param <K> the type of input key
- * @param <V> the type of input value
+ * and converts them to the input message. The input message type is:
+ * <ul>
+ *   <li>{@code T} if the input stream has an {@link InputTransformer} with 
result type T
+ *   <li>{@code KV<K, V>} if the input stream is keyed
+ *   <li>{@code V} if the input stream is unkeyed
+ * </ul>
  */
-public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { 
// Object == KV<K, V> | V
+public class InputOperatorSpec extends OperatorSpec<IncomingMessageEnvelope, 
Object> {
 
-  private final boolean isKeyed;
   private final String streamId;
+  private final boolean isKeyed;
+  private final InputTransformer transformer; // may be null
 
   /**
-   * The following {@link Serde}s are serialized by the ExecutionPlanner when 
generating the configs for a stream, and deserialized
-   * once during startup in SamzaContainer. They don't need to be deserialized 
here on a per-task basis
+   * The following {@link Serde}s are serialized by the ExecutionPlanner when 
generating the configs for a stream, and
+   * deserialized once during startup in SamzaContainer. They don't need to be 
deserialized here on a per-task basis
+   *
+   * Serdes are optional for intermediate streams and may be specified for 
job.default.system in configuration instead.
    */
-  private transient final Serde<K> keySerde;
-  private transient final Serde<V> valueSerde;
+  private transient final Serde keySerde;
+  private transient final Serde valueSerde;
 
-  public InputOperatorSpec(String streamId, Serde<K> keySerde, Serde<V> 
valueSerde, boolean isKeyed, String opId) {
+  public InputOperatorSpec(String streamId, Serde keySerde, Serde valueSerde,
+      InputTransformer transformer, boolean isKeyed, String opId) {
     super(OpCode.INPUT, opId);
     this.streamId = streamId;
+    this.isKeyed = isKeyed;
+    this.transformer = transformer;
     this.keySerde = keySerde;
     this.valueSerde = valueSerde;
-    this.isKeyed = isKeyed;
   }
 
   public String getStreamId() {
     return this.streamId;
   }
 
-  public Serde<K> getKeySerde() {
+  /**
+   * Get the key serde for this input stream if any.
+   *
+   * @return the key serde if any, else null
+   */
+  public Serde getKeySerde() {
     return keySerde;
   }
 
-  public Serde<V> getValueSerde() {
+  /**
+   * Get the value serde for this input stream if any.
+   *
+   * @return the value serde if any, else null
+   */
+  public Serde getValueSerde() {
     return valueSerde;
   }
 
@@ -66,6 +84,15 @@ public class InputOperatorSpec<K, V> extends 
OperatorSpec<KV<K, V>, Object> { //
     return isKeyed;
   }
 
+  /**
+   * Get the {@link InputTransformer} for this input stream if any.
+   *
+   * @return the {@link InputTransformer} if any, else null
+   */
+  public InputTransformer getTransformer() {
+    return transformer;
+  }
+
   @Override
   public WatermarkFunction getWatermarkFn() {
     return null;

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 9e788da..6ebbdae 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -22,6 +22,7 @@ package org.apache.samza.operators.spec;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.InputTransformer;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
@@ -44,15 +45,15 @@ public class OperatorSpecs {
    * @param streamId  the stream id for the input stream
    * @param keySerde  the serde for the input key
    * @param valueSerde  the serde for the input value
+   * @param transformer the input stream transformer
    * @param isKeyed  whether the input stream is keyed
    * @param opId  the unique ID of the operator
-   * @param <K>  type of input key
-   * @param <V>  type of input value
    * @return  the {@link InputOperatorSpec}
    */
-  public static <K, V> InputOperatorSpec<K, V> createInputOperatorSpec(
-    String streamId, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, 
String opId) {
-    return new InputOperatorSpec<>(streamId, keySerde, valueSerde, isKeyed, 
opId);
+  public static InputOperatorSpec createInputOperatorSpec(
+      String streamId, Serde keySerde, Serde valueSerde,
+      InputTransformer transformer, boolean isKeyed, String opId) {
+    return new InputOperatorSpec(streamId, keySerde, valueSerde, transformer, 
isKeyed, opId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
index 5d70e6f..fc736c3 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
@@ -30,6 +30,8 @@ public class OutputStreamImpl<M> implements OutputStream<M>, 
Serializable {
   /**
    * The following fields are serialized by the ExecutionPlanner when 
generating the configs for the output stream, and
    * deserialized once during startup in SamzaContainer. They don't need to be 
deserialized here on a per-task basis
+   *
+   * Serdes are optional for intermediate streams and may be specified for 
job.default.system in configuration instead.
    */
   private transient final Serde keySerde;
   private transient final Serde valueSerde;
@@ -45,10 +47,20 @@ public class OutputStreamImpl<M> implements 
OutputStream<M>, Serializable {
     return streamId;
   }
 
+  /**
+   * Get the key serde for this output stream if any.
+   *
+   * @return the key serde if any, else null
+   */
   public Serde getKeySerde() {
     return keySerde;
   }
 
+  /**
+   * Get the value serde for this output stream if any.
+   *
+   * @return the value serde if any, else null
+   */
   public Serde getValueSerde() {
     return valueSerde;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
index 3bb8713..17c6903 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -44,7 +44,7 @@ public class IntermediateMessageStreamImpl<M> extends 
MessageStreamImpl<M> imple
   private final OutputStreamImpl<M> outputStream;
   private final boolean isKeyed;
 
-  public IntermediateMessageStreamImpl(StreamGraphSpec graph, 
InputOperatorSpec<?, M> inputOperatorSpec,
+  public IntermediateMessageStreamImpl(StreamGraphSpec graph, 
InputOperatorSpec inputOperatorSpec,
       OutputStreamImpl<M> outputStream) {
     super(graph, (OperatorSpec<?, M>) inputOperatorSpec);
     this.outputStream = outputStream;

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 22550d5..ed67d80 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -164,7 +164,7 @@ public class StreamProcessor {
   }
 
   /**
-   * Same as {@link StreamProcessor(Config, Map, AsyncStreamTaskFactory, 
StreamProcessorLifecycleListener)}, except task
+   * Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, 
StreamProcessorLifecycleListener)}, except task
    * instances are created using the provided {@link StreamTaskFactory}.
    * @param config - config
    * @param customMetricsReporters metric Reporter

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index f7ca122..7127ff7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -30,6 +30,7 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ApplicationConfig.ApplicationMode;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StreamConfig;
@@ -81,6 +82,17 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
         ? ApplicationMode.BATCH : ApplicationMode.STREAM;
     cfg.put(ApplicationConfig.APP_MODE, mode.name());
 
+    // merge user-provided configuration with input/output descriptor 
generated configuration
+    // descriptor generated configuration has higher priority
+    Map<String, String> systemStreamConfigs = new HashMap<>();
+    graphSpec.getInputDescriptors().forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
+    graphSpec.getOutputDescriptors().forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
+    graphSpec.getSystemDescriptors().forEach(sd -> 
systemStreamConfigs.putAll(sd.toConfig()));
+    graphSpec.getDefaultSystemDescriptor().ifPresent(dsd ->
+        systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), 
dsd.getSystemName()));
+    Map<String, String> appConfigs = new HashMap<>(cfg);
+    appConfigs.putAll(systemStreamConfigs);
+
     // create the physical execution plan
     Config generatedConfig = new MapConfig(cfg);
     StreamManager streamManager = buildAndStartStreamManager(generatedConfig);

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index fdd134f..dd8d6c3 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -23,7 +23,6 @@ import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.MessageType;
 import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.KV;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -111,7 +110,7 @@ public class StreamOperatorTask implements StreamTask, 
InitableTask, WindowableT
     if (inputOpImpl != null) {
       switch (MessageType.of(ime.getMessage())) {
         case USER_MESSAGE:
-          inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), 
collector, coordinator);
+          inputOpImpl.onMessage(ime, collector, coordinator);
           break;
 
         case END_OF_STREAM:

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java
index e7a1e54..96cdd6c 100644
--- a/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java
@@ -32,6 +32,7 @@ public class StreamUtil {
    * Gets the {@link SystemStream} corresponding to the provided stream, which 
may be
    * a streamId, or stream name of the format systemName.streamName.
    *
+   * @param config the config for the job
    * @param stream the stream name or id to get the {@link SystemStream} for.
    * @return the {@link SystemStream} for the stream
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 4d25ebb..9912d8b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -36,8 +36,13 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
@@ -61,12 +66,18 @@ public class TestExecutionPlanner {
   private StreamManager streamManager;
   private Config config;
 
-  private StreamSpec input1;
-  private StreamSpec input2;
-  private StreamSpec input3;
-  private StreamSpec input4;
-  private StreamSpec output1;
-  private StreamSpec output2;
+  private StreamSpec input1Spec;
+  private GenericInputDescriptor<KV<Object, Object>> input1Descriptor;
+  private StreamSpec input2Spec;
+  private GenericInputDescriptor<KV<Object, Object>> input2Descriptor;
+  private StreamSpec input3Spec;
+  private GenericInputDescriptor<KV<Object, Object>> input3Descriptor;
+  private StreamSpec input4Spec;
+  private GenericInputDescriptor<KV<Object, Object>> input4Descriptor;
+  private StreamSpec output1Spec;
+  private GenericOutputDescriptor<KV<Object, Object>> output1Descriptor;
+  private StreamSpec output2Spec;
+  private GenericOutputDescriptor<KV<Object, Object>> output2Descriptor;
 
   static SystemAdmin createSystemAdmin(Map<String, Integer> 
streamToPartitions) {
 
@@ -104,8 +115,8 @@ public class TestExecutionPlanner {
      *
      */
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    MessageStream<KV<Object, Object>> input1 = 
graphSpec.getInputStream("input1");
-    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
+    MessageStream<KV<Object, Object>> input1 = 
graphSpec.getInputStream(input1Descriptor);
+    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream(output1Descriptor);
     input1
         .partitionBy(m -> m.key, m -> m.value, "p1")
         .map(kv -> kv)
@@ -128,19 +139,19 @@ public class TestExecutionPlanner {
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<Object, Object>> messageStream1 =
-        graphSpec.<KV<Object, Object>>getInputStream("input1")
+        graphSpec.getInputStream(input1Descriptor)
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
-        graphSpec.<KV<Object, Object>>getInputStream("input2")
+        graphSpec.getInputStream(input2Descriptor)
             .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
-        graphSpec.<KV<Object, Object>>getInputStream("input3")
+        graphSpec.getInputStream(input3Descriptor)
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
-    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
-    OutputStream<KV<Object, Object>> output2 = 
graphSpec.getOutputStream("output2");
+    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream(output1Descriptor);
+    OutputStream<KV<Object, Object>> output2 = 
graphSpec.getOutputStream(output2Descriptor);
 
     messageStream1
         .join(messageStream2,
@@ -160,19 +171,19 @@ public class TestExecutionPlanner {
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<Object, Object>> messageStream1 =
-        graphSpec.<KV<Object, Object>>getInputStream("input1")
+        graphSpec.getInputStream(input1Descriptor)
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
-        graphSpec.<KV<Object, Object>>getInputStream("input2")
+        graphSpec.getInputStream(input2Descriptor)
             .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
-        graphSpec.<KV<Object, Object>>getInputStream("input3")
+        graphSpec.getInputStream(input3Descriptor)
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
-    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
-    OutputStream<KV<Object, Object>> output2 = 
graphSpec.getOutputStream("output2");
+    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream(output1Descriptor);
+    OutputStream<KV<Object, Object>> output2 = 
graphSpec.getOutputStream(output2Descriptor);
 
     messageStream1.map(m -> m)
         .filter(m->true)
@@ -214,13 +225,24 @@ public class TestExecutionPlanner {
     StreamTestUtils.addStreamConfigs(configMap, "output2", "system2", 
"output2");
     config = new MapConfig(configMap);
 
-    input1 = new StreamSpec("input1", "input1", "system1");
-    input2 = new StreamSpec("input2", "input2", "system2");
-    input3 = new StreamSpec("input3", "input3", "system2");
-    input4 = new StreamSpec("input4", "input4", "system1");
-
-    output1 = new StreamSpec("output1", "output1", "system1");
-    output2 = new StreamSpec("output2", "output2", "system2");
+    input1Spec = new StreamSpec("input1", "input1", "system1");
+    input2Spec = new StreamSpec("input2", "input2", "system2");
+    input3Spec = new StreamSpec("input3", "input3", "system2");
+    input4Spec = new StreamSpec("input4", "input4", "system1");
+
+    output1Spec = new StreamSpec("output1", "output1", "system1");
+    output2Spec = new StreamSpec("output2", "output2", "system2");
+
+    KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new 
NoOpSerde());
+    String mockSystemFactoryClass = "factory.class.name";
+    GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", 
mockSystemFactoryClass);
+    GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", 
mockSystemFactoryClass);
+    input1Descriptor = system1.getInputDescriptor("input1", kvSerde);
+    input2Descriptor = system2.getInputDescriptor("input2", kvSerde);
+    input3Descriptor = system2.getInputDescriptor("input3", kvSerde);
+    input4Descriptor = system1.getInputDescriptor("input4", kvSerde);
+    output1Descriptor = system1.getOutputDescriptor("output1", kvSerde);
+    output2Descriptor = system2.getOutputDescriptor("output2", kvSerde);
 
     // set up external partition count
     Map<String, Integer> system1Map = new HashMap<>();
@@ -258,11 +280,11 @@ public class TestExecutionPlanner {
     JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
-    assertTrue(jobGraph.getOrCreateStreamEdge(input1).getPartitionCount() == 
64);
-    assertTrue(jobGraph.getOrCreateStreamEdge(input2).getPartitionCount() == 
16);
-    assertTrue(jobGraph.getOrCreateStreamEdge(input3).getPartitionCount() == 
32);
-    assertTrue(jobGraph.getOrCreateStreamEdge(output1).getPartitionCount() == 
8);
-    assertTrue(jobGraph.getOrCreateStreamEdge(output2).getPartitionCount() == 
16);
+    assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() 
== 64);
+    assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() 
== 16);
+    assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() 
== 32);
+    assertTrue(jobGraph.getOrCreateStreamEdge(output1Spec).getPartitionCount() 
== 8);
+    assertTrue(jobGraph.getOrCreateStreamEdge(output2Spec).getPartitionCount() 
== 16);
 
     jobGraph.getIntermediateStreamEdges().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == -1);
@@ -394,13 +416,13 @@ public class TestExecutionPlanner {
   @Test
   public void testMaxPartition() {
     Collection<StreamEdge> edges = new ArrayList<>();
-    StreamEdge edge = new StreamEdge(input1, false, false, config);
+    StreamEdge edge = new StreamEdge(input1Spec, false, false, config);
     edge.setPartitionCount(2);
     edges.add(edge);
-    edge = new StreamEdge(input2, false, false, config);
+    edge = new StreamEdge(input2Spec, false, false, config);
     edge.setPartitionCount(32);
     edges.add(edge);
-    edge = new StreamEdge(input3, false, false, config);
+    edge = new StreamEdge(input3Spec, false, false, config);
     edge.setPartitionCount(16);
     edges.add(edge);
 
@@ -417,8 +439,8 @@ public class TestExecutionPlanner {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
-    MessageStream<KV<Object, Object>> input1 = 
graphSpec.getInputStream("input4");
-    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
+    MessageStream<KV<Object, Object>> input1 = 
graphSpec.getInputStream(input4Descriptor);
+    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream(output1Descriptor);
     input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> 
kv).sendTo(output1);
     JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index b0f3843..960693f 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -29,8 +29,12 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.LongSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -99,21 +103,30 @@ public class TestJobGraphJsonGenerator {
     StreamManager streamManager = new StreamManager(systemAdmins);
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    graphSpec.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
+    KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new 
NoOpSerde());
+    String mockSystemFactoryClass = "factory.class.name";
+    GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", 
mockSystemFactoryClass);
+    GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", 
mockSystemFactoryClass);
+    GenericInputDescriptor<KV<Object, Object>> input1Descriptor = 
system1.getInputDescriptor("input1", kvSerde);
+    GenericInputDescriptor<KV<Object, Object>> input2Descriptor = 
system2.getInputDescriptor("input2", kvSerde);
+    GenericInputDescriptor<KV<Object, Object>> input3Descriptor = 
system2.getInputDescriptor("input3", kvSerde);
+    GenericOutputDescriptor<KV<Object, Object>>  output1Descriptor = 
system1.getOutputDescriptor("output1", kvSerde);
+    GenericOutputDescriptor<KV<Object, Object>> output2Descriptor = 
system2.getOutputDescriptor("output2", kvSerde);
+
     MessageStream<KV<Object, Object>> messageStream1 =
-        graphSpec.<KV<Object, Object>>getInputStream("input1")
+        graphSpec.getInputStream(input1Descriptor)
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
-        graphSpec.<KV<Object, Object>>getInputStream("input2")
+        graphSpec.getInputStream(input2Descriptor)
             .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
-        graphSpec.<KV<Object, Object>>getInputStream("input3")
+        graphSpec.getInputStream(input3Descriptor)
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
-    OutputStream<KV<Object, Object>> outputStream1 = 
graphSpec.getOutputStream("output1");
-    OutputStream<KV<Object, Object>> outputStream2 = 
graphSpec.getOutputStream("output2");
+    OutputStream<KV<Object, Object>> outputStream1 = 
graphSpec.getOutputStream(output1Descriptor);
+    OutputStream<KV<Object, Object>> outputStream2 = 
graphSpec.getOutputStream(output2Descriptor);
 
     messageStream1
         .join(messageStream2,
@@ -165,7 +178,16 @@ public class TestJobGraphJsonGenerator {
     StreamManager streamManager = new StreamManager(systemAdmins);
 
     StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    MessageStream<KV<String, PageViewEvent>> inputStream = 
graphSpec.getInputStream("PageView");
+    KVSerde<String, PageViewEvent> pvSerde = KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewEvent.class));
+    GenericSystemDescriptor isd = new GenericSystemDescriptor("hdfs", 
"mockSystemFactoryClass");
+    GenericInputDescriptor<KV<String, PageViewEvent>> pageView = 
isd.getInputDescriptor("PageView", pvSerde);
+
+    KVSerde<String, Long> pvcSerde = KVSerde.of(new StringSerde(), new 
LongSerde());
+    GenericSystemDescriptor osd = new GenericSystemDescriptor("kafka", 
"mockSystemFactoryClass");
+    GenericOutputDescriptor<KV<String, Long>> pageViewCount = 
osd.getOutputDescriptor("PageViewCount", pvcSerde);
+
+    MessageStream<KV<String, PageViewEvent>> inputStream = 
graphSpec.getInputStream(pageView);
+    OutputStream<KV<String, Long>> outputStream = 
graphSpec.getOutputStream(pageViewCount);
     inputStream
         .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), 
"keyed-by-country")
         .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(),
@@ -175,7 +197,7 @@ public class TestJobGraphJsonGenerator {
             new StringSerde(),
             new LongSerde()), "count-by-country")
         .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
-        .sendTo(graphSpec.getOutputStream("PageViewCount"));
+        .sendTo(outputStream);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());

Reply via email to