SAMZA-1073: moving all operator classes into samza-core

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8515448a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8515448a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8515448a

Branch: refs/heads/master
Commit: 8515448a2023ae6c78b9b0bb8e297cf346775e13
Parents: daaad7b
Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Authored: Thu Feb 16 15:04:01 2017 -0800
Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Committed: Thu Feb 16 15:04:01 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  24 +-
 gradle/dependency-versions.gradle               |   1 -
 .../samza/operators/MessageStreamImpl.java      | 182 +++++++++++++
 .../apache/samza/operators/StreamGraphImpl.java | 260 +++++++++++++++++++
 .../functions/PartialJoinFunction.java          |  56 ++++
 .../samza/operators/impl/OperatorGraph.java     | 164 ++++++++++++
 .../samza/operators/impl/OperatorImpl.java      |  67 +++++
 .../operators/impl/PartialJoinOperatorImpl.java |  47 ++++
 .../samza/operators/impl/RootOperatorImpl.java  |  35 +++
 .../impl/SessionWindowOperatorImpl.java         |  52 ++++
 .../samza/operators/impl/SinkOperatorImpl.java  |  44 ++++
 .../operators/impl/StreamOperatorImpl.java      |  49 ++++
 .../operators/impl/WindowOperatorImpl.java      |  43 +++
 .../samza/operators/spec/OperatorSpec.java      |  62 +++++
 .../samza/operators/spec/OperatorSpecs.java     | 210 +++++++++++++++
 .../operators/spec/PartialJoinOperatorSpec.java |  86 ++++++
 .../samza/operators/spec/SinkOperatorSpec.java  | 116 +++++++++
 .../operators/spec/StreamOperatorSpec.java      |  91 +++++++
 .../operators/spec/WindowOperatorSpec.java      |  72 +++++
 .../samza/operators/spec/WindowState.java       |  81 ++++++
 .../system/RemoteExecutionEnvironment.java      |  37 +++
 .../system/StandaloneExecutionEnvironment.java  |  50 ++++
 .../apache/samza/task/StreamOperatorTask.java   | 111 ++++++++
 .../samza/example/KeyValueStoreExample.java     | 180 +++++++++++++
 .../samza/example/NoContextStreamExample.java   | 151 +++++++++++
 .../samza/example/OrderShipmentJoinExample.java | 188 ++++++++++++++
 .../samza/example/PageViewCounterExample.java   | 129 +++++++++
 .../samza/example/RepartitionExample.java       | 140 ++++++++++
 .../samza/example/TestBasicStreamGraphs.java    |  99 +++++++
 .../samza/example/TestBroadcastExample.java     | 113 ++++++++
 .../apache/samza/example/TestExampleBase.java   |  46 ++++
 .../apache/samza/example/TestJoinExample.java   | 129 +++++++++
 .../apache/samza/example/TestWindowExample.java |  81 ++++++
 .../samza/operators/TestMessageStreamImpl.java  | 204 +++++++++++++++
 .../operators/TestMessageStreamImplUtil.java    |  26 ++
 .../data/JsonIncomingSystemMessageEnvelope.java |  60 +++++
 .../samza/operators/impl/TestOperatorImpl.java  |  73 ++++++
 .../samza/operators/impl/TestOperatorImpls.java | 235 +++++++++++++++++
 .../operators/impl/TestSinkOperatorImpl.java    |  50 ++++
 .../operators/impl/TestStreamOperatorImpl.java  |  60 +++++
 .../samza/operators/spec/TestOperatorSpecs.java | 127 +++++++++
 samza-operator/README.md                        |  17 --
 .../samza/operators/MessageStreamImpl.java      | 182 -------------
 .../apache/samza/operators/StreamGraphImpl.java | 260 -------------------
 .../functions/PartialJoinFunction.java          |  56 ----
 .../samza/operators/impl/OperatorGraph.java     | 164 ------------
 .../samza/operators/impl/OperatorImpl.java      |  67 -----
 .../operators/impl/PartialJoinOperatorImpl.java |  47 ----
 .../samza/operators/impl/RootOperatorImpl.java  |  35 ---
 .../impl/SessionWindowOperatorImpl.java         |  52 ----
 .../samza/operators/impl/SinkOperatorImpl.java  |  44 ----
 .../operators/impl/StreamOperatorImpl.java      |  49 ----
 .../operators/impl/WindowOperatorImpl.java      |  43 ---
 .../samza/operators/spec/OperatorSpec.java      |  62 -----
 .../samza/operators/spec/OperatorSpecs.java     | 210 ---------------
 .../operators/spec/PartialJoinOperatorSpec.java |  86 ------
 .../samza/operators/spec/SinkOperatorSpec.java  | 116 ---------
 .../operators/spec/StreamOperatorSpec.java      |  91 -------
 .../operators/spec/WindowOperatorSpec.java      |  72 -----
 .../samza/operators/spec/WindowState.java       |  81 ------
 .../system/RemoteExecutionEnvironment.java      |  37 ---
 .../system/StandaloneExecutionEnvironment.java  |  50 ----
 .../apache/samza/task/StreamOperatorTask.java   | 111 --------
 .../samza/example/KeyValueStoreExample.java     | 180 -------------
 .../samza/example/NoContextStreamExample.java   | 151 -----------
 .../samza/example/OrderShipmentJoinExample.java | 188 --------------
 .../samza/example/PageViewCounterExample.java   | 129 ---------
 .../samza/example/RepartitionExample.java       | 140 ----------
 .../samza/example/TestBasicStreamGraphs.java    |  99 -------
 .../samza/example/TestBroadcastExample.java     | 113 --------
 .../apache/samza/example/TestExampleBase.java   |  46 ----
 .../apache/samza/example/TestJoinExample.java   | 129 ---------
 .../apache/samza/example/TestWindowExample.java |  81 ------
 .../samza/operators/TestMessageStreamImpl.java  | 204 ---------------
 .../operators/TestMessageStreamImplUtil.java    |  26 --
 .../data/JsonIncomingSystemMessageEnvelope.java |  60 -----
 .../samza/operators/impl/TestOperatorImpl.java  |  73 ------
 .../samza/operators/impl/TestOperatorImpls.java | 235 -----------------
 .../operators/impl/TestSinkOperatorImpl.java    |  50 ----
 .../operators/impl/TestStreamOperatorImpl.java  |  60 -----
 .../samza/operators/spec/TestOperatorSpecs.java | 127 ---------
 settings.gradle                                 |   1 -
 82 files changed, 4007 insertions(+), 4048 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0d60970..400a913 100644
--- a/build.gradle
+++ b/build.gradle
@@ -160,6 +160,7 @@ project(":samza-core_$scalaVersion") {
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
     compile "com.101tec:zkclient:$zkClientVersion"
+    testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
@@ -372,29 +373,6 @@ project(":samza-yarn_$scalaVersion") {
   jar.dependsOn("lesscss")
 }
 
-project(":samza-operator") {
-  apply plugin: 'java'
-  apply plugin: 'checkstyle'
-    
-  sourceCompatibility = 1.8
-    
-  dependencies {
-    compile project(':samza-api')
-    compile project(":samza-core_$scalaVersion")
-    // TODO: remove this dependency after refactoring operator implementation 
classes
-    compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion"
-
-    testCompile project(":samza-api").sourceSets.test.output
-    testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
-  }
-  
-  checkstyle {
-    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
-    toolVersion = "$checkstyleVersion"
-  }
-}
-
 project(":samza-shell") {
   apply plugin: 'java'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 0193b64..0a8542b 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -38,6 +38,5 @@
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
   httpClientVersion="4.4.1"
-  reactiveStreamVersion="1.0.0"
   commonsLang3Version="3.4"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
new file mode 100644
index 0000000..830e4a5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The implementation for input/output {@link MessageStream}s to/from the 
operators.
+ * Users use the {@link MessageStream} API methods to describe and chain the 
operators specs.
+ *
+ * @param <M>  type of messages in this {@link MessageStream}
+ */
+public class MessageStreamImpl<M> implements MessageStream<M> {
+  /**
+   * The {@link StreamGraphImpl} object that contains this {@link 
MessageStreamImpl}
+   */
+  private final StreamGraphImpl graph;
+
+  /**
+   * The set of operators that consume the messages in this {@link 
MessageStream}
+   */
+  private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
+
+  /**
+   * Default constructor
+   *
+   * @param graph the {@link StreamGraphImpl} object that this stream belongs 
to
+   */
+  MessageStreamImpl(StreamGraphImpl graph) {
+    this.graph = graph;
+  }
+
+  @Override public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) {
+    OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, 
this.graph, new MessageStreamImpl<>(this.graph));
+    this.registeredOperatorSpecs.add(op);
+    return op.getNextStream();
+  }
+
+  @Override public MessageStream<M> filter(FilterFunction<M> filterFn) {
+    OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, 
this.graph, new MessageStreamImpl<>(this.graph));
+    this.registeredOperatorSpecs.add(op);
+    return op.getNextStream();
+  }
+
+  @Override
+  public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) {
+    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, 
this.graph, new MessageStreamImpl<>(this.graph));
+    this.registeredOperatorSpecs.add(op);
+    return op.getNextStream();
+  }
+
+  @Override
+  public void sink(SinkFunction<M> sinkFn) {
+    
this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, 
this.graph));
+  }
+
+  @Override public void sendTo(OutputStream<M> stream) {
+    
this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(),
 this.graph, stream));
+  }
+
+  @Override
+  public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> 
window) {
+    OperatorSpec<WindowPane<K, WV>> wndOp = 
OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
+        this.graph, new MessageStreamImpl<>(this.graph));
+    this.registeredOperatorSpecs.add(wndOp);
+    return wndOp.getNextStream();
+  }
+
+  @Override public <K, OM, RM> MessageStream<RM> join(MessageStream<OM> 
otherStream, JoinFunction<K, M, OM, RM> joinFn) {
+    MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph);
+
+    PartialJoinFunction<K, M, OM, RM> parJoin1 = new PartialJoinFunction<K, M, 
OM, RM>() {
+      @Override
+      public RM apply(M m1, OM om) {
+        return joinFn.apply(m1, om);
+      }
+
+      @Override
+      public K getKey(M message) {
+        return joinFn.getFirstKey(message);
+      }
+
+      @Override
+      public K getOtherKey(OM message) {
+        return joinFn.getSecondKey(message);
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        joinFn.init(config, context);
+      }
+    };
+
+    PartialJoinFunction<K, OM, M, RM> parJoin2 = new PartialJoinFunction<K, 
OM, M, RM>() {
+      @Override
+      public RM apply(OM m1, M m) {
+        return joinFn.apply(m, m1);
+      }
+
+      @Override
+      public K getKey(OM message) {
+        return joinFn.getSecondKey(message);
+      }
+
+      @Override
+      public K getOtherKey(M message) {
+        return joinFn.getFirstKey(message);
+      }
+    };
+
+    // TODO: need to add default store functions for the two partial join 
functions
+
+    ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(
+        OperatorSpecs.<OM, K, M, RM>createPartialJoinOperatorSpec(parJoin2, 
this.graph, outputStream));
+    this.registeredOperatorSpecs.add(OperatorSpecs.<M, K, OM, 
RM>createPartialJoinOperatorSpec(parJoin1, this.graph, outputStream));
+    return outputStream;
+  }
+
+  @Override
+  public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
+    MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph);
+
+    otherStreams.add(this);
+    otherStreams.forEach(other -> ((MessageStreamImpl<M>) 
other).registeredOperatorSpecs.
+        add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream)));
+    return outputStream;
+  }
+
+  @Override
+  public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
+    MessageStreamImpl<M> intStream = 
this.graph.createIntStream(parKeyExtractor);
+    OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
+    
this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(),
+        this.graph, outputStream));
+    return intStream;
+  }
+  /**
+   * Gets the operator specs registered to consume the output of this {@link 
MessageStream}. This is an internal API and
+   * should not be exposed to users.
+   *
+   * @return  a collection containing all {@link OperatorSpec}s that are 
registered with this {@link MessageStream}.
+   */
+  public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
+    return Collections.unmodifiableSet(this.registeredOperatorSpecs);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
new file mode 100644
index 0000000..dca3469
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import java.util.Properties;
+import java.util.function.Function;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The implementation of {@link StreamGraph} interface. This class provides 
implementation of methods to allow users to
+ * create system input/output/intermediate streams.
+ */
+public class StreamGraphImpl implements StreamGraph {
+
+  /**
+   * Unique identifier for each {@link 
org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link 
MessageEnvelope}
+   * in the input {@link MessageStream}s.
+   */
+  private int opId = 0;
+
+  private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends 
MessageStreamImpl<M> {
+    final StreamSpec spec;
+    final Serde<K> keySerde;
+    final Serde<V> msgSerde;
+
+    InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> 
keySerde, Serde<V> msgSerde) {
+      super(graph);
+      this.spec = streamSpec;
+      this.keySerde = keySerde;
+      this.msgSerde = msgSerde;
+    }
+
+    StreamSpec getSpec() {
+      return this.spec;
+    }
+
+  }
+
+  private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> 
implements OutputStream<M> {
+    final StreamSpec spec;
+    final Serde<K> keySerde;
+    final Serde<V> msgSerde;
+
+    OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> 
keySerde, Serde<V> msgSerde) {
+      this.spec = streamSpec;
+      this.keySerde = keySerde;
+      this.msgSerde = msgSerde;
+    }
+
+    StreamSpec getSpec() {
+      return this.spec;
+    }
+
+    @Override
+    public SinkFunction<M> getSinkFunction() {
+      return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+        // TODO: need to find a way to directly pass in the serde class names
+        // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), 
this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
+        //    message.getKey(), message.getKey(), message.getMessage()));
+        mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), 
message.getKey(), message.getMessage()));
+      };
+    }
+  }
+
+  private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, 
V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> {
+    final Function<M, PK> parKeyFn;
+
+    /**
+     * Default constructor
+     *
+     * @param graph the {@link StreamGraphImpl} object that this stream 
belongs to
+     */
+    IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, 
Serde<K> keySerde, Serde<V> msgSerde) {
+      this(graph, streamSpec, keySerde, msgSerde, null);
+    }
+
+    IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, 
Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) {
+      super(graph, streamSpec, keySerde, msgSerde);
+      this.parKeyFn = parKeyFn;
+    }
+
+    @Override
+    public SinkFunction<M> getSinkFunction() {
+      return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+        // TODO: need to find a way to directly pass in the serde class names
+        // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), 
this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
+        //    message.getKey(), message.getKey(), message.getMessage()));
+        if (this.parKeyFn == null) {
+          mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), 
message.getKey(), message.getMessage()));
+        } else {
+          // apply partition key function
+          mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), 
this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
+        }
+      };
+    }
+  }
+
+  /**
+   * Maps keeping all {@link SystemStream}s that are input and output of 
operators in {@link StreamGraphImpl}
+   */
+  private final Map<SystemStream, MessageStream> inStreams = new HashMap<>();
+  private final Map<SystemStream, OutputStream> outStreams = new HashMap<>();
+
+  private ContextManager contextManager = new ContextManager() { };
+
+  @Override
+  public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> 
createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new 
InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+    }
+    return this.inStreams.get(streamSpec.getSystemStream());
+  }
+
+  /**
+   * Helper method to be used by {@link MessageStreamImpl} class
+   *
+   * @param streamSpec  the {@link StreamSpec} object defining the {@link 
SystemStream} as the output
+   * @param <M>  the type of {@link MessageEnvelope}s in the output {@link 
SystemStream}
+   * @return  the {@link MessageStreamImpl} object
+   */
+  @Override
+  public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> 
createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new 
OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+    }
+    return this.outStreams.get(streamSpec.getSystemStream());
+  }
+
+  /**
+   * Helper method to be used by {@link MessageStreamImpl} class
+   *
+   * @param streamSpec  the {@link StreamSpec} object defining the {@link 
SystemStream} as an intermediate {@link SystemStream}
+   * @param <M>  the type of {@link MessageEnvelope}s in the output {@link 
SystemStream}
+   * @return  the {@link MessageStreamImpl} object
+   */
+  @Override
+  public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> 
createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new 
IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
+    }
+    IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, 
K, V, M>) this.inStreams.get(streamSpec.getSystemStream());
+    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+    }
+    return intStream;
+  }
+
+  @Override public Map<StreamSpec, MessageStream> getInStreams() {
+    Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>();
+    this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) 
entry).getSpec(), entry));
+    return Collections.unmodifiableMap(inStreamMap);
+  }
+
+  @Override public Map<StreamSpec, OutputStream> getOutStreams() {
+    Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
+    this.outStreams.forEach((ss, entry) -> 
outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry));
+    return Collections.unmodifiableMap(outStreamMap);
+  }
+
+  @Override
+  public StreamGraph withContextManager(ContextManager manager) {
+    this.contextManager = manager;
+    return this;
+  }
+
+  public int getNextOpId() {
+    return this.opId++;
+  }
+
+  public ContextManager getContextManager() {
+    return this.contextManager;
+  }
+
+  /**
+   * Helper method to be get the input stream via {@link SystemStream}
+   *
+   * @param systemStream  the {@link SystemStream}
+   * @return  a {@link MessageStreamImpl} object corresponding to the {@code 
systemStream}
+   */
+  public MessageStreamImpl getInputStream(SystemStream systemStream) {
+    if (this.inStreams.containsKey(systemStream)) {
+      return (MessageStreamImpl) this.inStreams.get(systemStream);
+    }
+    return null;
+  }
+
+  <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) {
+    if (this.outStreams.containsValue(intStream)) {
+      return (OutputStream<M>) intStream;
+    }
+    return null;
+  }
+
+  <M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
+    if (this.inStreams.containsValue(outStream)) {
+      return (MessageStream<M>) outStream;
+    }
+    return null;
+  }
+
+  /**
+   * Method to create intermediate topics for {@link 
MessageStreamImpl#partitionBy(Function)} method.
+   *
+   * @param parKeyFn  the function to extract the partition key from the input 
message
+   * @param <PK>  the type of partition key
+   * @param <M>  the type of input message
+   * @return  the {@link OutputStream} object for the re-partitioned stream
+   */
+  <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
+    // TODO: placeholder to auto-generate intermediate streams via {@link 
StreamSpec}
+    StreamSpec streamSpec = new StreamSpec() {
+      @Override
+      public SystemStream getSystemStream() {
+        // TODO: should auto-generate intermedaite stream name here
+        return new SystemStream("intermediate", String.format("par-%d", 
StreamGraphImpl.this.opId));
+      }
+
+      @Override
+      public Properties getProperties() {
+        return null;
+      }
+    };
+
+    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new 
IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
+    }
+    IntermediateStreamImpl intStream = (IntermediateStreamImpl) 
this.inStreams.get(streamSpec.getSystemStream());
+    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+    }
+    return intStream;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
 
b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
new file mode 100644
index 0000000..809a70a
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * This defines the interface function a two-way join functions that takes 
input messages from two input
+ * {@link org.apache.samza.operators.MessageStream}s and merge them into a 
single output joined message in the join output
+ */
+@InterfaceStability.Unstable
+public interface PartialJoinFunction<K, M, OM, RM> extends InitableFunction {
+
+  /**
+   * Method to perform join method on the two input messages
+   *
+   * @param m1  message from the first input stream
+   * @param om  message from the second input stream
+   * @return  the joined message in the output stream
+   */
+  RM apply(M m1, OM om);
+
+  /**
+   * Method to get the key from the input message
+   *
+   * @param message  the input message from the first strean
+   * @return  the join key in the {@code message}
+   */
+  K getKey(M message);
+
+  /**
+   * Method to get the key from the input message in the other stream
+   *
+   * @param message  the input message from the other stream
+   * @return  the join key in the {@code message}
+   */
+  K getOtherKey(OM message);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
new file mode 100644
index 0000000..66336f8
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link 
OperatorSpec}s for a
+ * {@link MessageStreamImpl}
+ */
+public class OperatorGraph {
+
+  /**
+   * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map 
registers all {@link OperatorImpl} in the DAG
+   * of {@link OperatorImpl} in a {@link 
org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created
+   * according to a single instance of {@link OperatorSpec}.
+   */
+  private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>();
+
+  /**
+   * This {@link Map} describes the DAG of {@link OperatorImpl} that are 
chained together to process the input messages.
+   */
+  private final Map<SystemStream, RootOperatorImpl> operatorGraph = new 
HashMap<>();
+
+  /**
+   * Initialize the whole DAG of {@link OperatorImpl}s, based on the input 
{@link MessageStreamImpl} from the {@link 
org.apache.samza.operators.StreamGraph}.
+   * This method will traverse each input {@link 
org.apache.samza.operators.MessageStream} in the {@code inputStreams} and
+   * instantiate the corresponding {@link OperatorImpl} chains that take the 
{@link org.apache.samza.operators.MessageStream} as input.
+   *
+   * @param inputStreams  the map of input {@link 
org.apache.samza.operators.MessageStream}s
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   */
+  public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config 
config, TaskContext context) {
+    inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, 
this.createOperatorImpls(mstream, config, context)));
+  }
+
+  /**
+   * Method to get the corresponding {@link RootOperatorImpl}
+   *
+   * @param ss  input {@link SystemStream}
+   * @param <M>  the type of input message
+   * @return  the {@link OperatorImpl} that starts processing the input message
+   */
+  public <M> OperatorImpl<M, M> get(SystemStream ss) {
+    return this.operatorGraph.get(ss);
+  }
+
+  /**
+   * Traverses the DAG of {@link OperatorSpec}s starting from the provided 
{@link MessageStreamImpl},
+   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its 
root {@link RootOperatorImpl} node.
+   *
+   * @param source  the input {@link MessageStreamImpl} to instantiate {@link 
OperatorImpl}s for
+   * @param <M>  the type of messagess in the {@code source} {@link 
MessageStreamImpl}
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  root node for the {@link OperatorImpl} DAG
+   */
+  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> 
source, Config config,
+      TaskContext context) {
+    // since the source message stream might have multiple operator specs 
registered on it,
+    // create a new root node as a single point of entry for the DAG.
+    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
+    // create the pipeline/topology starting from the source
+    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
+        // pass in the source and context s.t. stateful stream operators can 
initialize their stores
+        OperatorImpl<M, ?> operatorImpl =
+            this.createAndRegisterOperatorImpl(registeredOperator, source, 
config, context);
+        rootOperator.registerNextOperator(operatorImpl);
+      });
+    return rootOperator;
+  }
+
+  /**
+   * Helper method to recursively traverse the {@link OperatorSpec} DAG and 
instantiate and link the corresponding
+   * {@link OperatorImpl}s.
+   *
+   * @param operatorSpec  the operatorSpec registered with the {@code source}
+   * @param source  the source {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  the operator implementation for the operatorSpec
+   */
+  private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec 
operatorSpec,
+      MessageStreamImpl<M> source, Config config, TaskContext context) {
+    if (!operators.containsKey(operatorSpec)) {
+      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, 
operatorSpec, config, context);
+      if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) {
+        // this is the first time we've added the operatorImpl corresponding 
to the operatorSpec,
+        // so traverse and initialize and register the rest of the DAG.
+        // initialize the corresponding operator function
+        operatorSpec.init(config, context);
+        MessageStreamImpl nextStream = operatorSpec.getNextStream();
+        if (nextStream != null) {
+          Collection<OperatorSpec> registeredSpecs = 
nextStream.getRegisteredOperatorSpecs();
+          registeredSpecs.forEach(registeredSpec -> {
+              OperatorImpl subImpl = 
this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
+              operatorImpl.registerNextOperator(subImpl);
+            });
+        }
+        return operatorImpl;
+      }
+    }
+
+    // the implementation corresponding to operatorSpec has already been 
instantiated
+    // and registered, so we do not need to traverse the DAG further.
+    return operators.get(operatorSpec);
+  }
+
+  /**
+   * Creates a new {@link OperatorImpl} instance for the provided {@link 
OperatorSpec}.
+   *
+   * @param source  the source {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  the {@link OperatorImpl} implementation instance
+   */
+  private static <M> OperatorImpl<M, ?> 
createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, 
Config config, TaskContext context) {
+    if (operatorSpec instanceof StreamOperatorSpec) {
+      StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) 
operatorSpec;
+      return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
+    } else if (operatorSpec instanceof SinkOperatorSpec) {
+      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, 
config, context);
+    } else if (operatorSpec instanceof WindowOperatorSpec) {
+      return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) 
operatorSpec, source, config, context);
+    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
+      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) 
operatorSpec, source, config, context);
+    }
+    throw new IllegalArgumentException(
+        String.format("Unsupported OperatorSpec: %s", 
operatorSpec.getClass().getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
new file mode 100644
index 0000000..abb1fa9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Abstract base class for all stream operator implementations.
+ */
+public abstract class OperatorImpl<M, RM> {
+
+  private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>();
+
+  /**
+   * Register the next operator in the chain that this operator should 
propagate its output to.
+   * @param nextOperator  the next operator in the chain.
+   */
+  void registerNextOperator(OperatorImpl<RM, ?> nextOperator) {
+    nextOperators.add(nextOperator);
+  }
+
+  /**
+   * Perform the transformation required for this operator and call the 
downstream operators.
+   *
+   * Must call {@link #propagateResult} to propage the output to registered 
downstream operators correctly.
+   *
+   * @param message  the input message
+   * @param collector  the {@link MessageCollector} in the context
+   * @param coordinator  the {@link TaskCoordinator} in the context
+   */
+  public abstract void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator);
+
+  /**
+   * Helper method to propagate the output of this operator to all registered 
downstream operators.
+   *
+   * This method <b>must</b> be called from {@link #onNext} to propagate the 
operator output correctly.
+   *
+   * @param outputMessage  output message
+   * @param collector  the {@link MessageCollector} in the context
+   * @param coordinator  the {@link TaskCoordinator} in the context
+   */
+  void propagateResult(RM outputMessage, MessageCollector collector, 
TaskCoordinator coordinator) {
+    nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, 
coordinator));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
new file mode 100644
index 0000000..c8515e1
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a {@link PartialJoinOperatorSpec}. This class implements 
function
+ * that only takes in one input stream among all inputs to the join and 
generate the join output.
+ *
+ * @param <M>  type of messages in the input stream
+ * @param <JM>  type of messages in the stream to join with
+ * @param <RM>  type of messages in the joined stream
+ */
+class PartialJoinOperatorImpl<M, K, JM, RM> extends OperatorImpl<M, RM> {
+
+  PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp, 
MessageStreamImpl<M> source, Config config, TaskContext context) {
+    // TODO: implement PartialJoinOperatorImpl constructor
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    // TODO: implement PartialJoinOperatorImpl processing logic
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
new file mode 100644
index 0000000..4b30a5d
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * A no-op operator implementation that forwards incoming messages to all of 
its subscribers.
+ * @param <M>  type of incoming messages
+ */
+final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    this.propagateResult(message, collector, coordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
new file mode 100644
index 0000000..2bb362c
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Default implementation class of a {@link WindowOperatorSpec} for a session 
window.
+ *
+ * @param <M>  the type of input message
+ * @param <RK>  the type of window key
+ * @param <WV>  the type of window state
+ */
+class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, 
WindowPane<RK, WV>> {
+
+  private final WindowOperatorSpec<M, RK, WV> windowSpec;
+
+  SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, 
MessageStreamImpl<M> source, Config config, TaskContext context) {
+    this.windowSpec = windowSpec;
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+  }
+
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) 
{
+    // This is to periodically check the timeout triggers to get the list of 
window states to be updated
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
new file mode 100644
index 0000000..41d1778
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation for {@link SinkOperatorSpec}
+ */
+class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
+
+  private final SinkFunction<M> sinkFn;
+
+  SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext 
context) {
+    this.sinkFn = sinkOp.getSinkFn();
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    this.sinkFn.apply(message, collector, coordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
new file mode 100644
index 0000000..644de20
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * A StreamOperator that accepts a 1:n transform function and applies it to 
each incoming message.
+ *
+ * @param <M>  type of message in the input stream
+ * @param <RM>  type of message in the output stream
+ */
+class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
+
+  private final FlatMapFunction<M, RM> transformFn;
+
+  StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, 
MessageStreamImpl<M> source, Config config, TaskContext context) {
+    this.transformFn = streamOperatorSpec.getTransformFn();
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    // call the transform function and then for each output call 
propagateResult()
+    this.transformFn.apply(message).forEach(r -> this.propagateResult(r, 
collector, coordinator));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
new file mode 100644
index 0000000..af00553
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, 
WindowPane<WK, WV>> {
+
+  private final WindowInternal<M, WK, WV> window;
+
+  public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> 
source, Config config, TaskContext context) {
+    // source, config, and context are used to initialize the window kv-store
+    window = spec.getWindow();
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
new file mode 100644
index 0000000..1444662
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * A stateless serializable stream operator specification that holds all the 
information required
+ * to transform the input {@link MessageStreamImpl} and produce the output 
{@link MessageStreamImpl}.
+ *
+ * @param <OM>  the type of output message from the operator
+ */
+@InterfaceStability.Unstable
+public interface OperatorSpec<OM> {
+
+  enum OpCode {
+    MAP,
+    FLAT_MAP,
+    FILTER,
+    SINK,
+    SEND_TO,
+    JOIN,
+    WINDOW,
+    MERGE,
+    PARTITION_BY
+  }
+
+
+  /**
+   * Get the output stream containing transformed messages produced by this 
operator.
+   * @return  the output stream containing transformed messages produced by 
this operator.
+   */
+  MessageStreamImpl<OM> getNextStream();
+
+  /**
+   * Init method to initialize the context for this {@link OperatorSpec}. The 
default implementation is NO-OP.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  default void init(Config config, TaskContext context) { }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
new file mode 100644
index 0000000..d626852
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.spec;
+
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
+
+import java.util.ArrayList;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Factory methods for creating {@link OperatorSpec} instances.
+ */
+public class OperatorSpecs {
+
+  private OperatorSpecs() {}
+
+  /**
+   * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
+   *
+   * @param mapFn  the map function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param output  the output {@link MessageStreamImpl} object
+   * @param <M>  type of input message
+   * @param <OM>  type of output message
+   * @return  the {@link StreamOperatorSpec}
+   */
+  public static <M, OM> StreamOperatorSpec<M, OM> 
createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, 
MessageStreamImpl<OM> output) {
+    return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
+      @Override
+      public Collection<OM> apply(M message) {
+        return new ArrayList<OM>() {
+          {
+            OM r = mapFn.apply(message);
+            if (r != null) {
+              this.add(r);
+            }
+          }
+        };
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        mapFn.init(config, context);
+      }
+    }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
+   *
+   * @param filterFn  the transformation function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param output  the output {@link MessageStreamImpl} object
+   * @param <M>  type of input message
+   * @return  the {@link StreamOperatorSpec}
+   */
+  public static <M> StreamOperatorSpec<M, M> 
createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, 
MessageStreamImpl<M> output) {
+    return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
+      @Override
+      public Collection<M> apply(M message) {
+        return new ArrayList<M>() {
+          {
+            if (filterFn.apply(message)) {
+              this.add(message);
+            }
+          }
+        };
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        filterFn.init(config, context);
+      }
+    }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link StreamOperatorSpec}.
+   *
+   * @param transformFn  the transformation function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param output  the output {@link MessageStreamImpl} object
+   * @param <M>  type of input message
+   * @param <OM>  type of output message
+   * @return  the {@link StreamOperatorSpec}
+   */
+  public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
+      FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, 
MessageStreamImpl<OM> output) {
+    return new StreamOperatorSpec<>(transformFn, output, 
OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link SinkOperatorSpec}.
+   *
+   * @param sinkFn  the sink function
+   * @param <M>  type of input message
+   * @param graph  the {@link StreamGraphImpl} object
+   * @return  the {@link SinkOperatorSpec}
+   */
+  public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> 
sinkFn, StreamGraphImpl graph) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, 
graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link SinkOperatorSpec}.
+   *
+   * @param sinkFn  the sink function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param stream  the {@link OutputStream} where the message is sent to
+   * @param <M>  type of input message
+   * @return  the {@link SinkOperatorSpec}
+   */
+  public static <M> SinkOperatorSpec<M> 
createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, 
OutputStream<M> stream) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, 
graph.getNextOpId(), stream);
+  }
+
+  /**
+   * Creates a {@link SinkOperatorSpec}.
+   *
+   * @param sinkFn  the sink function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param stream  the {@link OutputStream} where the message is sent to
+   * @param <M>  type of input message
+   * @return  the {@link SinkOperatorSpec}
+   */
+  public static <M> SinkOperatorSpec<M> 
createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, 
OutputStream<M> stream) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, 
graph.getNextOpId(), stream);
+  }
+
+  /**
+   * Creates a {@link WindowOperatorSpec}.
+   *
+   * @param window the description of the window.
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param wndOutput  the window output {@link MessageStreamImpl} object
+   * @param <M> the type of input message
+   * @param <WK> the type of key in the {@link WindowPane}
+   * @param <WV> the type of value in the window
+   * @return  the {@link WindowOperatorSpec}
+   */
+
+  public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> 
createWindowOperatorSpec(
+      WindowInternal<M, WK, WV> window, StreamGraphImpl graph, 
MessageStreamImpl<WindowPane<WK, WV>> wndOutput) {
+    return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link PartialJoinOperatorSpec}.
+   *
+   * @param partialJoinFn  the join function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param joinOutput  the output {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param <K>  type of join key
+   * @param <JM>  the type of message in the other join stream
+   * @param <OM>  the type of message in the join output
+   * @return  the {@link PartialJoinOperatorSpec}
+   */
+  public static <M, K, JM, OM> PartialJoinOperatorSpec<M, K, JM, OM> 
createPartialJoinOperatorSpec(
+      PartialJoinFunction<K, M, JM, OM> partialJoinFn, StreamGraphImpl graph, 
MessageStreamImpl<OM> joinOutput) {
+    return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, 
graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link StreamOperatorSpec} with a merger function.
+   *
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param mergeOutput  the output {@link MessageStreamImpl} from the merger
+   * @param <M>  the type of input message
+   * @return  the {@link StreamOperatorSpec} for the merge
+   */
+  public static <M> StreamOperatorSpec<M, M> 
createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> 
mergeOutput) {
+    return new StreamOperatorSpec<M, M>(message ->
+        new ArrayList<M>() {
+          {
+            this.add(message);
+          }
+        },
+        mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
new file mode 100644
index 0000000..e057c2b
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Spec for the partial join operator that takes messages from one input 
stream, joins with buffered
+ * messages from another stream, and produces join results to an output {@link 
MessageStreamImpl}.
+ *
+ * @param <M>  the type of input message
+ * @param <K>  the type of join key
+ * @param <JM>  the type of message in the other join stream
+ * @param <RM>  the type of message in the join output stream
+ */
+public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> 
{
+
+  private final MessageStreamImpl<RM> joinOutput;
+
+  /**
+   * The transformation function of {@link PartialJoinOperatorSpec} that takes 
an input message of
+   * type {@code M}, joins with a stream of buffered messages of type {@code 
JM} from another stream,
+   * and generates a joined result message of type {@code RM}.
+   */
+  private final PartialJoinFunction<K, M, JM, RM> transformFn;
+
+
+  /**
+   * The unique ID for this operator.
+   */
+  private final int opId;
+
+  /**
+   * Default constructor for a {@link PartialJoinOperatorSpec}.
+   *
+   * @param partialJoinFn  partial join function that take type {@code M} of 
input message and join
+   *                       w/ type {@code JM} of buffered message from another 
stream
+   * @param joinOutput  the output {@link MessageStreamImpl} of the join 
results
+   */
+  PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, 
MessageStreamImpl<RM> joinOutput, int opId) {
+    this.joinOutput = joinOutput;
+    this.transformFn = partialJoinFn;
+    this.opId = opId;
+  }
+
+  @Override
+  public MessageStreamImpl<RM> getNextStream() {
+    return this.joinOutput;
+  }
+
+  public PartialJoinFunction<K, M, JM, RM> getTransformFn() {
+    return this.transformFn;
+  }
+
+  public OperatorSpec.OpCode getOpCode() {
+    return OpCode.JOIN;
+  }
+
+  public int getOpId() {
+    return this.opId;
+  }
+
+  @Override public void init(Config config, TaskContext context) {
+    this.transformFn.init(config, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
new file mode 100644
index 0000000..ba30d67
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The spec for a sink operator that accepts user-defined logic to output a 
{@link MessageStreamImpl} to an external
+ * system. This is a terminal operator and does allows further operator 
chaining.
+ *
+ * @param <M>  the type of input message
+ */
+public class SinkOperatorSpec<M> implements OperatorSpec {
+
+  /**
+   * {@link OpCode} for this {@link SinkOperatorSpec}
+   */
+  private final OperatorSpec.OpCode opCode;
+
+  /**
+   * The unique ID for this operator.
+   */
+  private final int opId;
+
+  /**
+   * The user-defined sink function
+   */
+  private final SinkFunction<M> sinkFn;
+
+  /**
+   * Potential output stream defined by the {@link SinkFunction}
+   */
+  private final OutputStream<M> outStream;
+
+  /**
+   * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. 
(e.g. output is sent to remote database)
+   *
+   * @param sinkFn  a user defined {@link SinkFunction} that will be called 
with the output message,
+   *                the output {@link org.apache.samza.task.MessageCollector} 
and the
+   *                {@link org.apache.samza.task.TaskCoordinator}.
+   * @param opCode  the specific {@link OpCode} for this {@link 
SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
+   *                or {@link OpCode#PARTITION_BY}
+   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link 
org.apache.samza.operators.StreamGraph}
+   */
+  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int 
opId) {
+    this(sinkFn, opCode, opId, null);
+  }
+
+  /**
+   * Default constructor for a {@link SinkOperatorSpec} that sends the output 
to an {@link OutputStream}
+   *
+   * @param sinkFn  a user defined {@link SinkFunction} that will be called 
with the output message,
+   *                the output {@link org.apache.samza.task.MessageCollector} 
and the
+   *                {@link org.apache.samza.task.TaskCoordinator}.
+   * @param opCode  the specific {@link OpCode} for this {@link 
SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
+   *                or {@link OpCode#PARTITION_BY}
+   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link 
org.apache.samza.operators.StreamGraph}
+   * @param opId  the {@link OutputStream} for this {@link SinkOperatorSpec}
+   */
+  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int 
opId, OutputStream<M> outStream) {
+    this.sinkFn = sinkFn;
+    this.opCode = opCode;
+    this.opId = opId;
+    this.outStream = outStream;
+  }
+
+  /**
+   * This is a terminal operator and doesn't allow further operator chaining.
+   * @return  null
+   */
+  @Override
+  public MessageStreamImpl<M> getNextStream() {
+    return null;
+  }
+
+  public SinkFunction<M> getSinkFn() {
+    return this.sinkFn;
+  }
+
+  public OperatorSpec.OpCode getOpCode() {
+    return this.opCode;
+  }
+
+  public int getOpId() {
+    return this.opId;
+  }
+
+  public OutputStream<M> getOutStream() {
+    return this.outStream;
+  }
+
+  @Override public void init(Config config, TaskContext context) {
+    this.sinkFn.init(config, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
new file mode 100644
index 0000000..d7813f7
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The spec for a linear stream operator that outputs 0 or more messages for 
each input message.
+ *
+ * @param <M>  the type of input message
+ * @param <OM>  the type of output message
+ */
+public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
+
+  /**
+   * {@link OpCode} for this {@link StreamOperatorSpec}
+   */
+  private final OperatorSpec.OpCode opCode;
+
+  /**
+   * The unique ID for this operator.
+   */
+  private final int opId;
+
+  /**
+   * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
+   */
+  private final MessageStreamImpl<OM> outputStream;
+
+  /**
+   * Transformation function applied in this {@link StreamOperatorSpec}
+   */
+  private final FlatMapFunction<M, OM> transformFn;
+
+  /**
+   * Constructor for a {@link StreamOperatorSpec} that accepts an output 
{@link MessageStreamImpl}.
+   *
+   * @param transformFn  the transformation function
+   * @param outputStream  the output {@link MessageStreamImpl}
+   * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
+   * @param opId  the unique id for this {@link StreamOperatorSpec} in a 
{@link org.apache.samza.operators.StreamGraph}
+   */
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl 
outputStream, OperatorSpec.OpCode opCode, int opId) {
+    this.outputStream = outputStream;
+    this.transformFn = transformFn;
+    this.opCode = opCode;
+    this.opId = opId;
+  }
+
+  @Override
+  public MessageStreamImpl<OM> getNextStream() {
+    return this.outputStream;
+  }
+
+  public FlatMapFunction<M, OM> getTransformFn() {
+    return this.transformFn;
+  }
+
+  public OperatorSpec.OpCode getOpCode() {
+    return this.opCode;
+  }
+
+  public int getOpId() {
+    return this.opId;
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) {
+    this.transformFn.init(config, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
new file mode 100644
index 0000000..46417ed
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
+
+
+/**
+ * Default window operator spec object
+ *
+ * @param <M>  the type of input message to the window
+ * @param <WK>  the type of key of the window
+ * @param <WV>  the type of aggregated value in the window output {@link 
WindowPane}
+ */
+public class WindowOperatorSpec<M, WK, WV> implements 
OperatorSpec<WindowPane<WK, WV>> {
+
+  private final WindowInternal<M, WK, WV> window;
+
+  private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
+
+  private final int opId;
+
+
+  /**
+   * Constructor for {@link WindowOperatorSpec}.
+   *
+   * @param window  the window function
+   * @param outputStream  the output {@link MessageStreamImpl} from this 
{@link WindowOperatorSpec}
+   * @param opId  auto-generated unique ID of this operator
+   */
+  WindowOperatorSpec(WindowInternal<M, WK, WV> window, 
MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
+    this.outputStream = outputStream;
+    this.window = window;
+    this.opId = opId;
+  }
+
+  @Override
+  public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
+    return this.outputStream;
+  }
+
+  public WindowInternal getWindow() {
+    return window;
+  }
+
+  public OpCode getOpCode() {
+    return OpCode.WINDOW;
+  }
+
+  public int getOpId() {
+    return this.opId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowState.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowState.java
new file mode 100644
index 0000000..53bca2e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowState.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * This interface defines the methods a window state class has to implement. 
The programmers are allowed to implement
+ * customized window state to be stored in window state stores by implementing 
this interface class.
+ *
+ * @param <WV>  the type for window output value
+ */
+@InterfaceStability.Unstable
+public interface WindowState<WV> {
+  /**
+   * Method to get the system time when the first message in the window is 
received
+   *
+   * @return  nano-second of system time for the first message received in the 
window
+   */
+  long getFirstMessageTimeNs();
+
+  /**
+   * Method to get the system time when the last message in the window is 
received
+   *
+   * @return  nano-second of system time for the last message received in the 
window
+   */
+  long getLastMessageTimeNs();
+
+  /**
+   * Method to get the earliest event time in the window
+   *
+   * @return  the earliest event time in nano-second in the window
+   */
+  long getEarliestEventTimeNs();
+
+  /**
+   * Method to get the latest event time in the window
+   *
+   * @return  the latest event time in nano-second in the window
+   */
+  long getLatestEventTimeNs();
+
+  /**
+   * Method to get the total number of messages received in the window
+   *
+   * @return  number of messages in the window
+   */
+  long getNumberMessages();
+
+  /**
+   * Method to get the corresponding window's output value
+   *
+   * @return  the corresponding window's output value
+   */
+  WV getOutputValue();
+
+  /**
+   * Method to set the corresponding window's output value
+   *
+   * @param value  the corresponding window's output value
+   */
+  void setOutputValue(WV value);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
 
b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
new file mode 100644
index 0000000..fafa2cb
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the 
applications in YARN environment
+ */
+public class RemoteExecutionEnvironment implements ExecutionEnvironment {
+
+  @Override public void run(StreamGraphBuilder app, Config config) {
+    // TODO: add description of ProcessContext that is going to create a 
sub-DAG of the {@code graph}
+    // TODO: actually instantiate the tasks and run the job, i.e.
+    // 1. create all input/output/intermediate topics
+    // 2. create the single job configuration
+    // 3. execute JobRunner to submit the single job for the whole graph
+  }
+
+}

Reply via email to