http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
index 247fe25..9bf4eb4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
@@ -1,41 +1,41 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.flink.runtime.iterative.concurrent.Broker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-@SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
-       /**
-        * Singleton instance
-        */
-       private static final BlockingQueueBroker INSTANCE = new 
BlockingQueueBroker();
-
-       private BlockingQueueBroker() {
-       }
-
-       /**
-        * retrieve singleton instance
-        */
-       public static Broker<BlockingQueue<StreamRecord>> instance() {
-               return INSTANCE;
-       }
-}
+
+@SuppressWarnings("rawtypes")
+public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
+       /**
+        * Singleton instance
+        */
+       private static final BlockingQueueBroker INSTANCE = new 
BlockingQueueBroker();
+
+       private BlockingQueueBroker() {
+       }
+
+       /**
+        * retrieve singleton instance
+        */
+       public static Broker<BlockingQueue<StreamRecord>> instance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
deleted file mode 100644
index 4358810..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
-
-/**
- * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
- * input types.
- */
-public class CoReaderIterator<T1, T2> {
-
-       private final CoRecordReader<DeserializationDelegate<T1>, 
DeserializationDelegate<T2>> reader; // the
-                                                                               
                                                                                
                                        // source
-
-       protected final ReusingDeserializationDelegate<T1> delegate1;
-       protected final ReusingDeserializationDelegate<T2> delegate2;
-
-       public CoReaderIterator(
-                       CoRecordReader<DeserializationDelegate<T1>, 
DeserializationDelegate<T2>> reader,
-                       TypeSerializer<T1> serializer1, TypeSerializer<T2> 
serializer2) {
-               this.reader = reader;
-               this.delegate1 = new 
ReusingDeserializationDelegate<T1>(serializer1);
-               this.delegate2 = new 
ReusingDeserializationDelegate<T2>(serializer2);
-       }
-
-       public int next(T1 target1, T2 target2) throws IOException {
-               this.delegate1.setInstance(target1);
-               this.delegate2.setInstance(target2);
-
-               try {
-                       return this.reader.getNextRecord(this.delegate1, 
this.delegate2);
-
-               } catch (InterruptedException e) {
-                       throw new IOException("Reader interrupted.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
deleted file mode 100644
index a7139b6..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
-import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
-
-/**
- * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
- * types to read records effectively.
- */
-@SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends 
IOReadableWritable> extends
-               AbstractReader implements EventListener<InputGate>, 
StreamingReader {
-
-       private final InputGate bufferReader1;
-
-       private final InputGate bufferReader2;
-
-       private final LinkedBlockingDeque<Integer> availableRecordReaders = new 
LinkedBlockingDeque<Integer>();
-
-       private LinkedList<Integer> processed = new LinkedList<Integer>();
-
-       private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
-
-       private RecordDeserializer<T1> reader1currentRecordDeserializer;
-
-       private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
-
-       private RecordDeserializer<T2> reader2currentRecordDeserializer;
-
-       // 0 => none, 1 => reader (T1), 2 => reader (T2)
-       private int currentReaderIndex;
-
-       private boolean hasRequestedPartitions;
-
-       protected CoBarrierBuffer barrierBuffer1;
-       protected CoBarrierBuffer barrierBuffer2;
-
-       public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
-               super(new UnionInputGate(inputgate1, inputgate2));
-
-               this.bufferReader1 = inputgate1;
-               this.bufferReader2 = inputgate2;
-
-               this.reader1RecordDeserializers = new 
AdaptiveSpanningRecordDeserializer[inputgate1
-                               .getNumberOfInputChannels()];
-               this.reader2RecordDeserializers = new 
AdaptiveSpanningRecordDeserializer[inputgate2
-                               .getNumberOfInputChannels()];
-
-               for (int i = 0; i < reader1RecordDeserializers.length; i++) {
-                       reader1RecordDeserializers[i] = new 
AdaptiveSpanningRecordDeserializer<T1>();
-               }
-
-               for (int i = 0; i < reader2RecordDeserializers.length; i++) {
-                       reader2RecordDeserializers[i] = new 
AdaptiveSpanningRecordDeserializer<T2>();
-               }
-
-               inputgate1.registerListener(this);
-               inputgate2.registerListener(this);
-
-               barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
-               barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
-
-               barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
-               barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
-       }
-
-       public void requestPartitionsOnce() throws IOException, 
InterruptedException {
-               if (!hasRequestedPartitions) {
-                       bufferReader1.requestPartitions();
-                       bufferReader2.requestPartitions();
-
-                       hasRequestedPartitions = true;
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       protected int getNextRecord(T1 target1, T2 target2) throws IOException, 
InterruptedException {
-
-               requestPartitionsOnce();
-
-               while (true) {
-                       if (currentReaderIndex == 0) {
-                               if ((bufferReader1.isFinished() && 
bufferReader2.isFinished())) {
-                                       return 0;
-                               }
-
-                               currentReaderIndex = 
getNextReaderIndexBlocking();
-
-                       }
-
-                       if (currentReaderIndex == 1) {
-                               while (true) {
-                                       if (reader1currentRecordDeserializer != 
null) {
-                                               
RecordDeserializer.DeserializationResult result = 
reader1currentRecordDeserializer
-                                                               
.getNextRecord(target1);
-
-                                               if (result.isBufferConsumed()) {
-                                                       
reader1currentRecordDeserializer.getCurrentBuffer().recycle();
-                                                       
reader1currentRecordDeserializer = null;
-
-                                                       currentReaderIndex = 0;
-                                               }
-
-                                               if (result.isFullRecord()) {
-                                                       return 1;
-                                               }
-                                       } else {
-
-                                               final BufferOrEvent boe = 
barrierBuffer1.getNextNonBlocked();
-
-                                               if (boe.isBuffer()) {
-                                                       
reader1currentRecordDeserializer = reader1RecordDeserializers[boe
-                                                                       
.getChannelIndex()];
-                                                       
reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-                                               } else if (boe.getEvent() 
instanceof StreamingSuperstep) {
-                                                       
barrierBuffer1.processSuperstep(boe);
-                                                       currentReaderIndex = 0;
-
-                                                       break;
-                                               } else if 
(handleEvent(boe.getEvent())) {
-                                                       currentReaderIndex = 0;
-
-                                                       break;
-                                               }
-                                       }
-                               }
-                       } else if (currentReaderIndex == 2) {
-                               while (true) {
-                                       if (reader2currentRecordDeserializer != 
null) {
-                                               
RecordDeserializer.DeserializationResult result = 
reader2currentRecordDeserializer
-                                                               
.getNextRecord(target2);
-
-                                               if (result.isBufferConsumed()) {
-                                                       
reader2currentRecordDeserializer.getCurrentBuffer().recycle();
-                                                       
reader2currentRecordDeserializer = null;
-
-                                                       currentReaderIndex = 0;
-                                               }
-
-                                               if (result.isFullRecord()) {
-                                                       return 2;
-                                               }
-                                       } else {
-                                               final BufferOrEvent boe = 
barrierBuffer2.getNextNonBlocked();
-
-                                               if (boe.isBuffer()) {
-                                                       
reader2currentRecordDeserializer = reader2RecordDeserializers[boe
-                                                                       
.getChannelIndex()];
-                                                       
reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-                                               } else if (boe.getEvent() 
instanceof StreamingSuperstep) {
-                                                       
barrierBuffer2.processSuperstep(boe);
-                                                       currentReaderIndex = 0;
-
-                                                       break;
-                                               } else if 
(handleEvent(boe.getEvent())) {
-                                                       currentReaderIndex = 0;
-
-                                                       break;
-                                               }
-                                       }
-                               }
-                       } else {
-                               throw new IllegalStateException("Bug: 
unexpected current reader index.");
-                       }
-               }
-       }
-
-       protected int getNextReaderIndexBlocking() throws InterruptedException {
-
-               Integer nextIndex = 0;
-
-               while (processed.contains(nextIndex = 
availableRecordReaders.take())) {
-                       processed.remove(nextIndex);
-               }
-
-               if (nextIndex == 1) {
-                       if (barrierBuffer1.isAllBlocked()) {
-                               availableRecordReaders.addFirst(1);
-                               processed.add(2);
-                               return 2;
-                       } else {
-                               return 1;
-                       }
-               } else {
-                       if (barrierBuffer2.isAllBlocked()) {
-                               availableRecordReaders.addFirst(2);
-                               processed.add(1);
-                               return 1;
-                       } else {
-                               return 2;
-                       }
-
-               }
-
-       }
-
-       // 
------------------------------------------------------------------------
-       // Data availability notifications
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void onEvent(InputGate bufferReader) {
-               addToAvailable(bufferReader);
-       }
-
-       protected void addToAvailable(InputGate bufferReader) {
-               if (bufferReader == bufferReader1) {
-                       availableRecordReaders.add(1);
-               } else if (bufferReader == bufferReader2) {
-                       availableRecordReaders.add(2);
-               }
-       }
-
-       public void clearBuffers() {
-               for (RecordDeserializer<?> deserializer : 
reader1RecordDeserializers) {
-                       Buffer buffer = deserializer.getCurrentBuffer();
-                       if (buffer != null && !buffer.isRecycled()) {
-                               buffer.recycle();
-                       }
-               }
-               for (RecordDeserializer<?> deserializer : 
reader2RecordDeserializers) {
-                       Buffer buffer = deserializer.getCurrentBuffer();
-                       if (buffer != null && !buffer.isRecycled()) {
-                               buffer.recycle();
-                       }
-               }
-       }
-
-       @Override
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               for (AdaptiveSpanningRecordDeserializer serializer : 
reader1RecordDeserializers) {
-                       serializer.setReporter(reporter);
-               }
-               for (AdaptiveSpanningRecordDeserializer serializer : 
reader2RecordDeserializers) {
-                       serializer.setReporter(reporter);
-               }
-       }
-
-       private class CoBarrierBuffer extends BarrierBuffer {
-
-               private CoBarrierBuffer otherBuffer;
-
-               public CoBarrierBuffer(InputGate inputGate, AbstractReader 
reader) {
-                       super(inputGate, reader);
-               }
-
-               public void setOtherBarrierBuffer(CoBarrierBuffer other) {
-                       this.otherBuffer = other;
-               }
-
-               @Override
-               protected void actOnAllBlocked() {
-                       if (otherBuffer.isAllBlocked()) {
-                               super.actOnAllBlocked();
-                               otherBuffer.releaseBlocks();
-                       }
-               }
-
-       }
-
-       public void cleanup() throws IOException {
-               try {
-                       barrierBuffer1.cleanup();
-               } finally {
-                       barrierBuffer2.cleanup();
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
new file mode 100644
index 0000000..2f9d1d6
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
+
+       private OutputSelectorWrapper<OUT> outputSelectorWrapper;
+
+       private List<Output<OUT>> allOutputs;
+
+       public CollectorWrapper(OutputSelectorWrapper<OUT> 
outputSelectorWrapper) {
+               this.outputSelectorWrapper = outputSelectorWrapper;
+               allOutputs = new ArrayList<Output<OUT>>();
+       }
+
+       public void addCollector(Collector<StreamRecord<?>> output, StreamEdge 
edge) {
+               outputSelectorWrapper.addCollector(output, edge);
+               allOutputs.add((Output) output);
+       }
+
+       @Override
+       public void collect(StreamRecord<OUT> record) {
+               for (Collector<StreamRecord<OUT>> output : 
outputSelectorWrapper.getSelectedOutputs(record.getValue())) {
+                       output.collect(record);
+               }
+       }
+
+       @Override
+       public void emitWatermark(Watermark mark) {
+               for (Output<OUT> output : allOutputs) {
+                       output.emitWatermark(mark);
+               }
+       }
+
+       @Override
+       public void close() {
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
deleted file mode 100644
index 7f2a9c5..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class IndexedMutableReader<T extends IOReadableWritable> extends
-               StreamingMutableRecordReader<T> {
-
-       InputGate reader;
-
-       public IndexedMutableReader(InputGate reader) {
-               super(reader);
-               this.reader = reader;
-       }
-
-       public int getNumberOfInputChannels() {
-               return reader.getNumberOfInputChannels();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
deleted file mode 100644
index 2050e27..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-
-public class IndexedReaderIterator<T> extends ReaderIterator<T> {
-
-       public IndexedReaderIterator(
-                       IndexedMutableReader<DeserializationDelegate<T>> reader,
-                       TypeSerializer<T> serializer) {
-
-               super(reader, serializer);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
deleted file mode 100644
index 7883251..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.util.Collection;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-
-public class InputGateFactory {
-
-       public static InputGate createInputGate(Collection<InputGate> 
inputGates) {
-               return createInputGate(inputGates.toArray(new 
InputGate[inputGates.size()]));
-       }
-
-       public static InputGate createInputGate(InputGate[] inputGates) {
-               if (inputGates.length <= 0) {
-                       throw new RuntimeException("No such input gate.");
-               }
-
-               if (inputGates.length < 2) {
-                       return inputGates[0];
-               } else {
-                       return new UnionInputGate(inputGates);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
new file mode 100644
index 0000000..01e16fb
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+
+/**
+ * Utility for dealing with input gates. This will either just return
+ * the single {@link InputGate} that was passed in or create a {@link 
UnionInputGate} if several
+ * {@link InputGate input gates} are given.
+ */
+public class InputGateUtil {
+
+       public static InputGate createInputGate(Collection<InputGate> 
inputGates1, Collection<InputGate> inputGates2) {
+               List<InputGate> gates = new 
ArrayList<InputGate>(inputGates1.size() + inputGates2.size());
+               gates.addAll(inputGates1);
+               gates.addAll(inputGates2);
+               return createInputGate(gates.toArray(new 
InputGate[gates.size()]));
+       }
+
+       public static InputGate createInputGate(InputGate[] inputGates) {
+               if (inputGates.length <= 0) {
+                       throw new RuntimeException("No such input gate.");
+               }
+
+               if (inputGates.length < 2) {
+                       return inputGates[0];
+               } else {
+                       return new UnionInputGate(inputGates);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
new file mode 100644
index 0000000..e9cbb7d
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link Output} that sends data using a {@link 
RecordWriter}.
+ */
+public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RecordWriterOutput.class);
+
+       private RecordWriter<SerializationDelegate> recordWriter;
+       private SerializationDelegate serializationDelegate;
+
+       @SuppressWarnings("unchecked")
+       public RecordWriterOutput(
+                       RecordWriter<SerializationDelegate> recordWriter,
+                       TypeSerializer<OUT> outSerializer,
+                       boolean enableWatermarkMultiplexing) {
+               Preconditions.checkNotNull(recordWriter);
+
+               this.recordWriter = recordWriter;
+
+               StreamRecordSerializer<OUT> outRecordSerializer;
+               if (enableWatermarkMultiplexing) {
+                       outRecordSerializer = new 
MultiplexingStreamRecordSerializer<OUT>(outSerializer);
+               } else {
+                       outRecordSerializer = new 
StreamRecordSerializer<OUT>(outSerializer);
+               }
+
+               if (outSerializer != null) {
+                       serializationDelegate = new 
SerializationDelegate(outRecordSerializer);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void collect(StreamRecord<OUT> record) {
+               serializationDelegate.setInstance(record);
+
+               try {
+                       recordWriter.emit(serializationDelegate);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Emit failed: {}", e);
+                       }
+                       throw new RuntimeException("Element emission failed.", 
e);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void emitWatermark(Watermark mark) {
+               serializationDelegate.setInstance(mark);
+               try {
+                       recordWriter.broadcastEmit(serializationDelegate);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Watermark emit failed: {}", e);
+                       }
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public void close() {
+               if (recordWriter instanceof StreamRecordWriter) {
+                       ((StreamRecordWriter) recordWriter).close();
+               } else {
+                       try {
+                               recordWriter.flush();
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                       }
+               }
+       }
+
+       public void clearBuffers() {
+               recordWriter.clearBuffers();
+       }
+
+       public void broadcastEvent(TaskEvent barrier) throws IOException, 
InterruptedException {
+               recordWriter.broadcastEvent(barrier);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
new file mode 100644
index 0000000..e665710
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
+ *
+ * <p>
+ * This also keeps track of {@link Watermark} events and forwards them to 
event subscribers
+ * once the {@link Watermark} from all inputs advances.
+ * 
+ * @param <IN> The type of the record that can be read with this record reader.
+ */
+public class StreamInputProcessor<IN> extends AbstractReader implements 
ReaderBase, StreamingReader {
+
+       @SuppressWarnings("unused")
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamInputProcessor.class);
+
+       private final RecordDeserializer<DeserializationDelegate>[] 
recordDeserializers;
+
+       private RecordDeserializer<DeserializationDelegate> 
currentRecordDeserializer;
+
+       // We need to keep track of the channel from which a buffer came, so 
that we can
+       // appropriately map the watermarks to input channels
+       int currentChannel = -1;
+
+       private boolean isFinished;
+
+       private final BarrierBuffer barrierBuffer;
+
+       private long[] watermarks;
+       private long lastEmittedWatermark;
+
+       private DeserializationDelegate deserializationDelegate;
+
+       @SuppressWarnings("unchecked")
+       public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> 
inputSerializer, boolean enableWatermarkMultiplexing) {
+               super(InputGateUtil.createInputGate(inputGates));
+
+               barrierBuffer = new BarrierBuffer(inputGate, this);
+
+               StreamRecordSerializer<IN> inputRecordSerializer;
+               if (enableWatermarkMultiplexing) {
+                       inputRecordSerializer = new 
MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+               } else {
+                       inputRecordSerializer = new 
StreamRecordSerializer<IN>(inputSerializer);
+               }
+               this.deserializationDelegate = new 
NonReusingDeserializationDelegate(inputRecordSerializer);
+
+               // Initialize one deserializer per input channel
+               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+               for (int i = 0; i < recordDeserializers.length; i++) {
+                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>();
+               }
+
+               watermarks = new long[inputGate.getNumberOfInputChannels()];
+               for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
+                       watermarks[i] = Long.MIN_VALUE;
+               }
+               lastEmittedWatermark = Long.MIN_VALUE;
+       }
+
+       @SuppressWarnings("unchecked")
+       public boolean processInput(OneInputStreamOperator<IN, ?> 
streamOperator) throws Exception {
+               if (isFinished) {
+                       return false;
+               }
+
+               while (true) {
+                       if (currentRecordDeserializer != null) {
+                               DeserializationResult result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate);
+
+                               if (result.isBufferConsumed()) {
+                                       
currentRecordDeserializer.getCurrentBuffer().recycle();
+                                       currentRecordDeserializer = null;
+                               }
+
+                               if (result.isFullRecord()) {
+                                       Object recordOrWatermark = 
deserializationDelegate.getInstance();
+
+                                       if (recordOrWatermark instanceof 
Watermark) {
+                                               Watermark mark = (Watermark) 
recordOrWatermark;
+                                               long watermarkMillis = 
mark.getTimestamp();
+                                               if (watermarkMillis > 
watermarks[currentChannel]) {
+                                                       
watermarks[currentChannel] = watermarkMillis;
+                                                       long newMinWatermark = 
Long.MAX_VALUE;
+                                                       for (long watermark : 
watermarks) {
+                                                               if (watermark < 
newMinWatermark) {
+                                                                       
newMinWatermark = watermark;
+                                                               }
+                                                       }
+                                                       if (newMinWatermark > 
lastEmittedWatermark) {
+                                                               
lastEmittedWatermark = newMinWatermark;
+                                                               
streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
+                                                       }
+                                               }
+                                               continue;
+                                       } else {
+                                               // now we can do the actual 
processing
+                                               StreamRecord<IN> record = 
(StreamRecord<IN>) deserializationDelegate.getInstance();
+                                               StreamingRuntimeContext ctx = 
streamOperator.getRuntimeContext();
+                                               if (ctx != null) {
+                                                       
ctx.setNextInput(record);
+                                               }
+                                               
streamOperator.processElement(record);
+                                               return true;
+                                       }
+                               }
+                       }
+
+                       final BufferOrEvent bufferOrEvent = 
barrierBuffer.getNextNonBlocked();
+
+                       if (bufferOrEvent.isBuffer()) {
+                               currentChannel = 
bufferOrEvent.getChannelIndex();
+                               currentRecordDeserializer = 
recordDeserializers[currentChannel];
+                               
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+                       } else {
+                               // Event received
+                               final AbstractEvent event = 
bufferOrEvent.getEvent();
+
+                               if (event instanceof CheckpointBarrier) {
+                                       
barrierBuffer.processBarrier(bufferOrEvent);
+                               } else {
+                                       if (handleEvent(event)) {
+                                               if (inputGate.isFinished()) {
+                                                       if 
(!barrierBuffer.isEmpty()) {
+                                                               throw new 
RuntimeException("BarrierBuffer should be empty at this point");
+                                                       }
+                                                       isFinished = true;
+                                                       return false;
+                                               } else if 
(hasReachedEndOfSuperstep()) {
+                                                       return false;
+                                               } // else: More data is 
coming...
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       deserializer.setReporter(reporter);
+               }
+       }
+
+       public void clearBuffers() {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       Buffer buffer = deserializer.getCurrentBuffer();
+                       if (buffer != null && !buffer.isRecycled()) {
+                               buffer.recycle();
+                       }
+               }
+       }
+
+       public void cleanup() throws IOException {
+               barrierBuffer.cleanup();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index c212346..abae9a4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -61,6 +61,14 @@ public class StreamRecordWriter<T extends 
IOReadableWritable> extends RecordWrit
                }
        }
 
+       @Override
+       public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+               super.broadcastEmit(record);
+               if (flushAlways) {
+                       flush();
+               }
+       }
+
        public void close() {
                try {
                        if (outputFlusher != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
new file mode 100644
index 0000000..1fe98bb
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}.
+ *
+ * <p>
+ * This also keeps track of {@link 
org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to 
event subscribers
+ * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from 
all inputs advances.
+ *
+ * @param <IN1> The type of the records that arrive on the first input
+ * @param <IN2> The type of the records that arrive on the second input
+ */
+public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader 
implements ReaderBase, StreamingReader {
+
+       @SuppressWarnings("unused")
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamTwoInputProcessor.class);
+
+       private final RecordDeserializer[] recordDeserializers;
+
+       private RecordDeserializer currentRecordDeserializer;
+
+       // We need to keep track of the channel from which a buffer came, so 
that we can
+       // appropriately map the watermarks to input channels
+       int currentChannel = -1;
+
+       private boolean isFinished;
+
+       private final BarrierBuffer barrierBuffer;
+
+       private long[] watermarks1;
+       private long lastEmittedWatermark1;
+
+       private long[] watermarks2;
+       private long lastEmittedWatermark2;
+
+       private int numInputChannels1;
+       private int numInputChannels2;
+
+       private DeserializationDelegate deserializationDelegate1;
+       private DeserializationDelegate deserializationDelegate2;
+
+       @SuppressWarnings("unchecked")
+       public StreamTwoInputProcessor(
+                       Collection<InputGate> inputGates1,
+                       Collection<InputGate> inputGates2,
+                       TypeSerializer<IN1> inputSerializer1,
+                       TypeSerializer<IN2> inputSerializer2,
+                       boolean enableWatermarkMultiplexing) {
+               super(InputGateUtil.createInputGate(inputGates1, inputGates2));
+
+               barrierBuffer = new BarrierBuffer(inputGate, this);
+
+               StreamRecordSerializer<IN1> inputRecordSerializer1;
+               if (enableWatermarkMultiplexing) {
+                       inputRecordSerializer1 = new 
MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
+               } else {
+                       inputRecordSerializer1 = new 
StreamRecordSerializer<IN1>(inputSerializer1);
+               }
+               this.deserializationDelegate1 = new 
NonReusingDeserializationDelegate(inputRecordSerializer1);
+
+               StreamRecordSerializer<IN2> inputRecordSerializer2;
+               if (enableWatermarkMultiplexing) {
+                       inputRecordSerializer2 = new 
MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
+               } else {
+                       inputRecordSerializer2 = new 
StreamRecordSerializer<IN2>(inputSerializer2);
+               }
+               this.deserializationDelegate2 = new 
NonReusingDeserializationDelegate(inputRecordSerializer2);
+
+               // Initialize one deserializer per input channel
+               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate
+                               .getNumberOfInputChannels()];
+               for (int i = 0; i < recordDeserializers.length; i++) {
+                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer();
+               }
+
+               // determine which unioned channels belong to input 1 and which 
belong to input 2
+               numInputChannels1 = 0;
+               for (InputGate gate: inputGates1) {
+                       numInputChannels1 += gate.getNumberOfInputChannels();
+               }
+               numInputChannels2 = inputGate.getNumberOfInputChannels() - 
numInputChannels1;
+
+               watermarks1 = new long[numInputChannels1];
+               for (int i = 0; i < numInputChannels1; i++) {
+                       watermarks1[i] = Long.MIN_VALUE;
+               }
+               lastEmittedWatermark1 = Long.MIN_VALUE;
+
+               watermarks2 = new long[numInputChannels2];
+               for (int i = 0; i < numInputChannels2; i++) {
+                       watermarks2[i] = Long.MIN_VALUE;
+               }
+               lastEmittedWatermark2 = Long.MIN_VALUE;
+       }
+
+       @SuppressWarnings("unchecked")
+       public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> 
streamOperator) throws Exception {
+               if (isFinished) {
+                       return false;
+               }
+
+               while (true) {
+                       if (currentRecordDeserializer != null) {
+                               DeserializationResult result;
+                               if (currentChannel < numInputChannels1) {
+                                       result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate1);
+                               } else {
+                                       result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate2);
+                               }
+
+                               if (result.isBufferConsumed()) {
+                                       
currentRecordDeserializer.getCurrentBuffer().recycle();
+                                       currentRecordDeserializer = null;
+                               }
+
+                               if (result.isFullRecord()) {
+                                       if (currentChannel < numInputChannels1) 
{
+                                               Object recordOrWatermark = 
deserializationDelegate1.getInstance();
+                                               if (recordOrWatermark 
instanceof Watermark) {
+                                                       
handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
+                                                       continue;
+                                               } else {
+                                                       
streamOperator.processElement1((StreamRecord<IN1>) 
deserializationDelegate1.getInstance());
+                                                       return true;
+
+                                               }
+                                       } else {
+                                               Object recordOrWatermark = 
deserializationDelegate2.getInstance();
+                                               if (recordOrWatermark 
instanceof Watermark) {
+                                                       
handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
+                                                       continue;
+                                               } else {
+                                                       
streamOperator.processElement2((StreamRecord<IN2>) 
deserializationDelegate2.getInstance());
+                                                       return true;
+                                               }
+                                       }
+                               }
+                       }
+
+                       final BufferOrEvent bufferOrEvent = 
barrierBuffer.getNextNonBlocked();
+
+                       if (bufferOrEvent.isBuffer()) {
+                               currentChannel = 
bufferOrEvent.getChannelIndex();
+                               currentRecordDeserializer = 
recordDeserializers[currentChannel];
+                               
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+
+                       } else {
+                               // Event received
+                               final AbstractEvent event = 
bufferOrEvent.getEvent();
+
+                               if (event instanceof CheckpointBarrier) {
+                                       
barrierBuffer.processBarrier(bufferOrEvent);
+                               } else {
+                                       if (handleEvent(event)) {
+                                               if (inputGate.isFinished()) {
+                                                       if 
(!barrierBuffer.isEmpty()) {
+                                                               throw new 
RuntimeException("BarrierBuffer should be empty at this point");
+                                                       }
+                                                       isFinished = true;
+                                                       return false;
+                                               } else if 
(hasReachedEndOfSuperstep()) {
+                                                       return false;
+                                               } // else: More data is 
coming...
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> 
operator, Watermark mark, int channelIndex) throws Exception {
+               if (channelIndex < numInputChannels1) {
+                       long watermarkMillis = mark.getTimestamp();
+                       if (watermarkMillis > watermarks1[channelIndex]) {
+                               watermarks1[channelIndex] = watermarkMillis;
+                               long newMinWatermark = Long.MAX_VALUE;
+                               for (long aWatermarks1 : watermarks1) {
+                                       if (aWatermarks1 < newMinWatermark) {
+                                               newMinWatermark = aWatermarks1;
+                                       }
+                               }
+                               if (newMinWatermark > lastEmittedWatermark1) {
+                                       lastEmittedWatermark1 = newMinWatermark;
+                                       operator.processWatermark1(new 
Watermark(lastEmittedWatermark1));
+                               }
+                       }
+               } else {
+                       channelIndex = channelIndex - numInputChannels1;
+                       long watermarkMillis = mark.getTimestamp();
+                       if (watermarkMillis > watermarks2[channelIndex]) {
+                               watermarks2[channelIndex] = watermarkMillis;
+                               long newMinWatermark = Long.MAX_VALUE;
+                               for (long aWatermarks2 : watermarks2) {
+                                       if (aWatermarks2 < newMinWatermark) {
+                                               newMinWatermark = aWatermarks2;
+                                       }
+                               }
+                               if (newMinWatermark > lastEmittedWatermark2) {
+                                       lastEmittedWatermark2 = newMinWatermark;
+                                       operator.processWatermark2(new 
Watermark(lastEmittedWatermark2));
+                               }
+                       }
+               }
+
+       }
+
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       deserializer.setReporter(reporter);
+               }
+       }
+
+       public void clearBuffers() {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       Buffer buffer = deserializer.getCurrentBuffer();
+                       if (buffer != null && !buffer.isRecycled()) {
+                               buffer.recycle();
+                       }
+               }
+       }
+
+       public void cleanup() throws IOException {
+               barrierBuffer.cleanup();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
deleted file mode 100644
index 44f9a86..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
-import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- * 
- * @param <T>
- *            The type of the record that can be read with this record reader.
- */
-public abstract class StreamingAbstractRecordReader<T extends 
IOReadableWritable> extends
-               AbstractReader implements ReaderBase, StreamingReader {
-
-       @SuppressWarnings("unused")
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
-
-       private final RecordDeserializer<T>[] recordDeserializers;
-
-       private RecordDeserializer<T> currentRecordDeserializer;
-
-       private boolean isFinished;
-
-       private final BarrierBuffer barrierBuffer;
-
-
-       @SuppressWarnings("unchecked")
-       protected StreamingAbstractRecordReader(InputGate inputGate) {
-               super(inputGate);
-               barrierBuffer = new BarrierBuffer(inputGate, this);
-
-               // Initialize one deserializer per input channel
-               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate
-                               .getNumberOfInputChannels()];
-               for (int i = 0; i < recordDeserializers.length; i++) {
-                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>();
-               }
-       }
-
-       protected boolean getNextRecord(T target) throws IOException, 
InterruptedException {
-               if (isFinished) {
-                       return false;
-               }
-
-               while (true) {
-                       if (currentRecordDeserializer != null) {
-                               DeserializationResult result = 
currentRecordDeserializer.getNextRecord(target);
-
-                               if (result.isBufferConsumed()) {
-                                       Buffer currentBuffer = 
currentRecordDeserializer.getCurrentBuffer();
-                                       currentBuffer.recycle();
-                                       currentRecordDeserializer = null;
-                               }
-
-                               if (result.isFullRecord()) {
-                                       return true;
-                               }
-                       }
-
-                       final BufferOrEvent bufferOrEvent = 
barrierBuffer.getNextNonBlocked();
-
-                       if (bufferOrEvent.isBuffer()) {
-                               currentRecordDeserializer = 
recordDeserializers[bufferOrEvent.getChannelIndex()];
-                               
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-                       } else {
-                               // Event received
-                               final AbstractEvent event = 
bufferOrEvent.getEvent();
-
-                               if (event instanceof StreamingSuperstep) {
-                                       
barrierBuffer.processSuperstep(bufferOrEvent);
-                               } else {
-                                       if (handleEvent(event)) {
-                                               if (inputGate.isFinished()) {
-                                                       if 
(!barrierBuffer.isEmpty()) {
-                                                               throw new 
RuntimeException(
-                                                                               
"BarrierBuffer should be empty at this point");
-                                                       }
-                                                       isFinished = true;
-                                                       return false;
-                                               } else if 
(hasReachedEndOfSuperstep()) {
-                                                       return false;
-                                               } // else: More data is 
coming...
-                                       }
-                               }
-                       }
-               }
-       }
-
-       public void clearBuffers() {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       Buffer buffer = deserializer.getCurrentBuffer();
-                       if (buffer != null && !buffer.isRecycled()) {
-                               buffer.recycle();
-                       }
-               }
-       }
-
-       public void cleanup() throws IOException {
-               barrierBuffer.cleanup();
-       }
-
-       @Override
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       deserializer.setReporter(reporter);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
deleted file mode 100644
index 1356af5..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
-               StreamingAbstractRecordReader<T> implements MutableReader<T> {
-
-       public StreamingMutableRecordReader(InputGate inputGate) {
-               super(inputGate);
-       }
-
-       @Override
-       public boolean next(final T target) throws IOException, 
InterruptedException {
-               return getNextRecord(target);
-       }
-
-       @Override
-       public void clearBuffers() {
-               super.clearBuffers();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index 75867cd..6c40c03 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -44,10 +44,14 @@ public class CustomPartitionerWrapper<K, T> extends 
StreamPartitioner<T> {
        }
 
        @Override
-       public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record,
-                       int numberOfOutputChannels) {
-
-               K key = record.getInstance().getKey(keySelector);
+       public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record, int numberOfOutputChannels) {
+
+               K key = null;
+               try {
+                       key = 
keySelector.getKey(record.getInstance().getValue());
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not extract key from 
" + record.getInstance(), e);
+               }
 
                returnArray[0] = partitioner.partition(key,
                                numberOfOutputChannels);

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
index 08c431b..7026d45 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
@@ -42,8 +42,13 @@ public class FieldsPartitioner<T> extends 
StreamPartitioner<T> {
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record,
                        int numberOfOutputChannels) {
-               returnArray[0] = 
Math.abs(record.getInstance().getKey(keySelector).hashCode()
-                               % numberOfOutputChannels);
+               Object key;
+               try {
+                       key = 
keySelector.getKey(record.getInstance().getValue());
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not extract key from 
" + record.getInstance().getValue(), e);
+               }
+               returnArray[0] = Math.abs(key.hashCode() % 
numberOfOutputChannels);
 
                return returnArray;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
new file mode 100644
index 0000000..715f0d2
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link StreamRecord} and {@link Watermark}. This does not 
behave like a normal
+ * {@link TypeSerializer}, instead, this is only used at the
+ * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for 
transmitting
+ * {@link StreamRecord StreamRecords} and {@link Watermark Watermarks}. This 
serializer
+ * can handle both of them, therefore it returns {@link Object} the result has
+ * to be cast to the correct type.
+ *
+ * @param <T> The type of value in the {@link 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
+ */
+public final class MultiplexingStreamRecordSerializer<T> extends 
StreamRecordSerializer<T> {
+
+       private final long IS_WATERMARK = Long.MIN_VALUE;
+
+       private static final long serialVersionUID = 1L;
+
+       public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) 
{
+               super(serializer);
+               if (serializer instanceof MultiplexingStreamRecordSerializer) {
+                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public Object copy(Object from) {
+               // we can reuse the timestamp since Instant is immutable
+               if (from instanceof StreamRecord) {
+                       StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+                       return new 
StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), 
fromRecord.getTimestamp());
+               } else if (from instanceof Watermark) {
+                       // is immutable
+                       return from;
+               } else {
+                       throw new RuntimeException("Cannot copy " + from);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public Object copy(Object from, Object reuse) {
+               if (from instanceof StreamRecord && reuse instanceof 
StreamRecord) {
+                       StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+                       StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+
+                       
reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), 
reuseRecord.getValue()), fromRecord.getTimestamp());
+                       return reuse;
+               } else if (from instanceof Watermark) {
+                       // is immutable
+                       return from;
+               } else {
+                       throw new RuntimeException("Cannot copy " + from);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void serialize(Object value, DataOutputView target) throws 
IOException {
+               if (value instanceof StreamRecord) {
+                       StreamRecord<T> record = (StreamRecord<T>) value;
+                       target.writeLong(record.getTimestamp());
+                       typeSerializer.serialize(record.getValue(), target);
+               } else if (value instanceof Watermark) {
+                       target.writeLong(IS_WATERMARK);
+                       target.writeLong(((Watermark) value).getTimestamp());
+               }
+       }
+       
+       @Override
+       public Object deserialize(DataInputView source) throws IOException {
+               long millis = source.readLong();
+
+               if (millis == IS_WATERMARK) {
+                       return new Watermark(source.readLong());
+               } else {
+                       T element = typeSerializer.deserialize(source);
+                       return new StreamRecord<T>(element, millis);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public Object deserialize(Object reuse, DataInputView source) throws 
IOException {
+               long millis = source.readLong();
+
+               if (millis == IS_WATERMARK) {
+                       return new Watermark(source.readLong());
+
+               } else {
+                       StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+                       T element = 
typeSerializer.deserialize(reuseRecord.getValue(), source);
+                       reuseRecord.replace(element, millis);
+                       return reuse;
+               }
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               long millis = source.readLong();
+               target.writeLong(millis);
+
+               if (millis == IS_WATERMARK) {
+                       target.writeLong(source.readLong());
+               } else {
+                       typeSerializer.copy(source, target);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 66a64b3..aff030e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -17,87 +17,106 @@
 
 package org.apache.flink.streaming.runtime.streamrecord;
 
-import java.io.Serializable;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-
 /**
- * Object for wrapping a tuple or other object with ID used for sending records
- * between streaming task in Apache Flink stream processing.
+ * One value in a data stream. This stores the value and the associated 
timestamp.
  */
-public class StreamRecord<T> implements Serializable {
-       private static final long serialVersionUID = 1L;
+public class StreamRecord<T> {
 
-       private T streamObject;
-       public boolean isTuple;
+       // We store it as Object so that we can reuse a StreamElement for 
emitting
+       // elements of a different type while still reusing the timestamp.
+       private Object value;
+       private long timestamp;
 
        /**
-        * Creates an empty StreamRecord
+        * Creates a new {@link StreamRecord} wrapping the given value. The 
timestamp is set to the
+        * result of {@code new Instant(0)}.
         */
-       public StreamRecord() {
+       public StreamRecord(T value) {
+               this(value, Long.MIN_VALUE + 1);
+               // be careful to set it to MIN_VALUE + 1, because MIN_VALUE is 
reserved as the
+               // special tag to signify that a transmitted element is a 
Watermark in StreamRecordSerializer
        }
 
        /**
-        * Gets the wrapped object from the StreamRecord
-        * 
-        * @return The object wrapped
+        * Creates a new {@link StreamRecord} wrapping the given value. The 
timestamp is set to the
+        * given timestamp.
+        *
+        * @param value The value to wrap in this {@link StreamRecord}
+        * @param timestamp The timestamp in milliseconds
         */
-       public T getObject() {
-               return streamObject;
+       public StreamRecord(T value, long timestamp) {
+               this.value = value;
+               this.timestamp = timestamp;
        }
 
        /**
-        * Gets the field of the contained object at the given position. If a 
tuple
-        * is wrapped then the getField method is invoked. If the StreamRecord
-        * contains and object of Basic types only position 0 could be returned.
-        * 
-        * @param pos
-        *            Position of the field to get.
-        * @return Returns the object contained in the position.
+        * Returns the value wrapped in this stream value.
         */
-       public Object getField(int pos) {
-               if (isTuple) {
-                       return ((Tuple) streamObject).getField(pos);
-               } else {
-                       if (pos == 0) {
-                               return streamObject;
-                       } else {
-                               throw new IndexOutOfBoundsException();
-                       }
-               }
+       @SuppressWarnings("unchecked")
+       public T getValue() {
+               return (T) value;
        }
 
        /**
-        * Extracts key for the stored object using the keySelector provided.
-        * 
-        * @param keySelector
-        *            KeySelector for extracting the key
-        * @return The extracted key
+        * Returns the timestamp associated with this stream value in 
milliseconds.
         */
-       public <R> R getKey(KeySelector<T, R> keySelector) {
-               try {
-                       return keySelector.getKey(streamObject);
-               } catch (Exception e) {
-                       throw new RuntimeException("Failed to extract key: " + 
e.getMessage());
-               }
+       public long getTimestamp() {
+               return timestamp;
+       }
+
+       /**
+        * Replace the currently stored value by the given new value. This 
returns a StreamElement
+        * with the generic type parameter that matches the new value while 
keeping the old
+        * timestamp.
+        *
+        * @param element Element to set in this stream value
+        * @return Returns the StreamElement with replaced value
+        */
+       @SuppressWarnings("unchecked")
+       public <X> StreamRecord<X> replace(X element) {
+               this.value = element;
+               return (StreamRecord<X>) this;
        }
 
        /**
-        * Sets the object stored
-        * 
-        * @param object
-        *            Object to set
-        * @return Returns the StreamRecord object
+        * Replace the currently stored value by the given new value and the 
currently stored
+        * timestamp with the new timestamp. This returns a StreamElement with 
the generic type
+        * parameter that matches the new value.
+        *
+        * @param value The new value to wrap in this {@link StreamRecord}
+        * @param timestamp The new timestamp in milliseconds
+        * @return Returns the StreamElement with replaced value
         */
-       public StreamRecord<T> setObject(T object) {
-               this.streamObject = object;
-               return this;
+       @SuppressWarnings("unchecked")
+       public <X> StreamRecord<X> replace(X value, long timestamp) {
+               this.timestamp = timestamp;
+               this.value = value;
+               return (StreamRecord<X>) this;
        }
 
        @Override
-       public String toString() {
-               return streamObject.toString();
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               StreamRecord that = (StreamRecord) o;
+
+               return value.equals(that.value) && timestamp == that.timestamp;
        }
 
+       @Override
+       public int hashCode() {
+               int result = value != null ? value.hashCode() : 0;
+               result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "Record{" + value + "; " + timestamp + '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 4499499..b05eb36 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -20,26 +20,35 @@ package org.apache.flink.streaming.runtime.streamrecord;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord<T>> {
+/**
+ * Serializer for {@link StreamRecord}. This version ignores timestamps and 
only deals with
+ * the element.
+ *
+ * <p>
+ * {@link MultiplexingStreamRecordSerializer} is a version that deals with 
timestamps and also
+ * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark 
Watermarks} in the same
+ * stream with {@link StreamRecord StreamRecords}.
+ *
+ * @see MultiplexingStreamRecordSerializer
+ *
+ * @param <T> The type of value in the {@link StreamRecord}
+ */
+public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
 
        private static final long serialVersionUID = 1L;
 
-       private final TypeSerializer<T> typeSerializer;
-       private final boolean isTuple;
+       protected final TypeSerializer<T> typeSerializer;
 
-       public StreamRecordSerializer(TypeInformation<T> typeInfo, 
ExecutionConfig executionConfig) {
-               this.typeSerializer = 
typeInfo.createSerializer(executionConfig);
-               this.isTuple = typeInfo.isTupleType();
-       }
-
-       public TypeSerializer<T> getObjectSerializer() {
-               return typeSerializer;
+       public StreamRecordSerializer(TypeSerializer<T> serializer) {
+               if (serializer instanceof StreamRecordSerializer) {
+                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+               }
+               this.typeSerializer = Preconditions.checkNotNull(serializer);
        }
 
        @Override
@@ -48,34 +57,34 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
        }
 
        @Override
-       public StreamRecordSerializer<T> duplicate() {
+       @SuppressWarnings("unchecked")
+       public TypeSerializer duplicate() {
                return this;
        }
 
        @Override
-       public StreamRecord<T> createInstance() {
+       public Object createInstance() {
                try {
-                       StreamRecord<T> t = new StreamRecord<T>();
-                       t.isTuple = isTuple;
-                       t.setObject(typeSerializer.createInstance());
-                       return t;
+                       return new 
StreamRecord<T>(typeSerializer.createInstance());
                } catch (Exception e) {
                        throw new RuntimeException("Cannot instantiate 
StreamRecord.", e);
                }
        }
        
        @Override
-       public StreamRecord<T> copy(StreamRecord<T> from) {
-               StreamRecord<T> rec = new StreamRecord<T>();
-               rec.isTuple = from.isTuple;
-               rec.setObject(typeSerializer.copy(from.getObject()));
-               return rec;
+       @SuppressWarnings("unchecked")
+       public Object copy(Object from) {
+               StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+               return new 
StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), 
fromRecord.getTimestamp());
        }
 
        @Override
-       public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> 
reuse) {
-               reuse.isTuple = from.isTuple;
-               reuse.setObject(typeSerializer.copy(from.getObject(), 
reuse.getObject()));
+       @SuppressWarnings("unchecked")
+       public Object copy(Object from, Object reuse) {
+               StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+               StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+
+               reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), 
reuseRecord.getValue()), 0);
                return reuse;
        }
 
@@ -85,26 +94,29 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
        }
 
        @Override
-       public void serialize(StreamRecord<T> value, DataOutputView target) 
throws IOException {
-               typeSerializer.serialize(value.getObject(), target);
+       @SuppressWarnings("unchecked")
+       public void serialize(Object value, DataOutputView target) throws 
IOException {
+               StreamRecord<T> record = (StreamRecord<T>) value;
+               typeSerializer.serialize(record.getValue(), target);
        }
        
        @Override
-       public StreamRecord<T> deserialize(DataInputView source) throws 
IOException {
-               StreamRecord<T> record = new StreamRecord<T>();
-               record.isTuple = this.isTuple;
-               record.setObject(typeSerializer.deserialize(source));
-               return record;
+       public Object deserialize(DataInputView source) throws IOException {
+               T element = typeSerializer.deserialize(source);
+               return new StreamRecord<T>(element, 0);
        }
 
        @Override
-       public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView 
source) throws IOException {
-               reuse.setObject(typeSerializer.deserialize(reuse.getObject(), 
source));
+       @SuppressWarnings("unchecked")
+       public Object deserialize(Object reuse, DataInputView source) throws 
IOException {
+               StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+               T element = typeSerializer.deserialize(reuseRecord.getValue(), 
source);
+               reuseRecord.replace(element, 0);
                return reuse;
        }
 
        @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               // Needs to be implemented
+               typeSerializer.copy(source, target);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
new file mode 100644
index 0000000..d94b5b4
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.TaskEvent;
+
+/**
+ * Checkpoint barriers are used to synchronize checkpoints throughout the 
streaming topology. The
+ * barriers are emitted by the sources when instructed to do so by the 
JobManager. When
+ * operators receive a {@link CheckpointBarrier} on one of its inputs it must 
block processing
+ * of further elements on this input until all inputs received the checkpoint 
barrier
+ * corresponding to to that checkpoint. Once all inputs received the 
checkpoint barrier for
+ * a checkpoint the operator is to perform the checkpoint and then broadcast 
the barrier to
+ * downstream operators.
+ *
+ * <p>
+ * The checkpoint barrier IDs are advancing. Once an operator receives a 
{@link CheckpointBarrier}
+ * for a checkpoint with a higher id it is to discard all barriers that it 
received from previous
+ * checkpoints and unblock all other inputs.
+ */
+public class CheckpointBarrier extends TaskEvent {
+
+       protected long id;
+       protected long timestamp;
+
+       public CheckpointBarrier() {}
+
+       public CheckpointBarrier(long id, long timestamp) {
+               this.id = id;
+               this.timestamp = timestamp;
+       }
+
+       public long getId() {
+               return id;
+       }
+
+       public long getTimestamp() {
+               return id;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeLong(id);
+               out.writeLong(timestamp);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               id = in.readLong();
+               timestamp = in.readLong();
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+       }
+
+       @Override
+       public boolean equals(Object other) {
+               if (other == null || !(other instanceof CheckpointBarrier)) {
+                       return false;
+               }
+               else {
+                       CheckpointBarrier that = (CheckpointBarrier) other;
+                       return that.id == this.id && that.timestamp == 
this.timestamp;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return String.format("CheckpointBarrier %d @ %d", id, 
timestamp);
+       }
+}

Reply via email to