[FLINK-1638] [streaming] RegisterState removed from datastream + CoRecordReader 
barrier test added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/490fa700
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/490fa700
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/490fa700

Branch: refs/heads/master
Commit: 490fa700c6310b0f30decd058f97a38955136566
Parents: 37390d6
Author: Gyula Fora <gyf...@apache.org>
Authored: Mon Mar 9 09:12:51 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../kafka/api/simple/PersistentKafkaSource.java | 11 ++-
 .../flink/streaming/api/StreamConfig.java       | 16 ----
 .../apache/flink/streaming/api/StreamGraph.java | 26 +-----
 .../api/StreamingJobGraphGenerator.java         |  1 -
 .../datastream/SingleOutputStreamOperator.java  | 43 ----------
 .../api/invokable/operator/co/CoInvokable.java  |  6 +-
 .../api/streamvertex/StreamVertex.java          | 68 ++++++---------
 .../streamvertex/StreamingRuntimeContext.java   | 27 ++++--
 .../flink/streaming/io/BarrierBuffer.java       | 11 ++-
 .../flink/streaming/io/CoRecordReader.java      | 10 ++-
 .../flink/streaming/io/BarrierBufferTest.java   | 12 +--
 .../flink/streaming/io/CoRecordReaderTest.java  | 90 ++++++++++++++++++++
 12 files changed, 169 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index fd428c0..0f980e8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -45,17 +45,16 @@ public class PersistentKafkaSource<OUT> extends 
SimpleKafkaSource<OUT> {
                this.initialOffset = initialOffset;
        }
 
+       @SuppressWarnings("unchecked")
        @Override
        public void open(Configuration parameters) throws InterruptedException {
                StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
-               @SuppressWarnings("unchecked")
-               OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) 
context.getState("kafka");
-
-               if (lastKafkaOffSet.getState() == null) {
+               
+               if (context.containsState("kafka")) {
+                       kafkaOffSet = (OperatorState<Long>) 
context.getState("kafka");
+               } else {
                        kafkaOffSet = new OperatorState<Long>(initialOffset);
                        context.registerState("kafka", kafkaOffSet);
-               } else {
-                       kafkaOffSet = lastKafkaOffSet;
                }
 
                super.open(parameters);

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index d813a30..ea19a44 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.util.InstantiationUtil;
 
 public class StreamConfig implements Serializable {
@@ -55,7 +54,6 @@ public class StreamConfig implements Serializable {
        private static final String SERIALIZEDUDF = "serializedudf";
        private static final String USER_FUNCTION = "userfunction";
        private static final String BUFFER_TIMEOUT = "bufferTimeout";
-       private static final String OPERATOR_STATES = "operatorStates";
        private static final String TYPE_SERIALIZER_IN_1 = 
"typeSerializer_in_1";
        private static final String TYPE_SERIALIZER_IN_2 = 
"typeSerializer_in_2";
        private static final String TYPE_SERIALIZER_OUT_1 = 
"typeSerializer_out_1";
@@ -331,20 +329,6 @@ public class StreamConfig implements Serializable {
                return config.getInteger(INPUT_TYPE + inputNumber, 0);
        }
 
-       public void setOperatorStates(Map<String, OperatorState<?>> states) {
-               config.setBytes(OPERATOR_STATES, 
SerializationUtils.serialize((Serializable) states));
-       }
-
-       @SuppressWarnings("unchecked")
-       public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
-               try {
-                       return (Map<String, OperatorState<?>>) 
InstantiationUtil.readObjectFromConfig(
-                                       this.config, OPERATOR_STATES, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not load operator 
state");
-               }
-       }
-
        public void setChainedOutputs(List<Integer> chainedOutputs) {
                config.setBytes(CHAINED_OUTPUTS,
                                SerializationUtils.serialize((Serializable) 
chainedOutputs));

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 8334aa1..f0fbaab 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -46,7 +46,6 @@ import 
org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
 import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.sling.commons.json.JSONArray;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.commons.json.JSONObject;
@@ -85,7 +84,6 @@ public class StreamGraph extends StreamingPlan {
        private Map<Integer, Integer> iterationIDtoTailID;
        private Map<Integer, Integer> iterationTailCount;
        private Map<Integer, Long> iterationTimeouts;
-       private Map<Integer, Map<String, OperatorState<?>>> operatorStates;
        private Map<Integer, InputFormat<String, ?>> inputFormatLists;
        private List<Map<Integer, ?>> containingMaps;
 
@@ -149,10 +147,8 @@ public class StreamGraph extends StreamingPlan {
                containingMaps.add(iterationTailCount);
                iterationTimeouts = new HashMap<Integer, Long>();
                containingMaps.add(iterationTailCount);
-               operatorStates = new HashMap<Integer, Map<String, 
OperatorState<?>>>();
-               containingMaps.add(operatorStates);
                inputFormatLists = new HashMap<Integer, InputFormat<String, 
?>>();
-               containingMaps.add(operatorStates);
+               containingMaps.add(inputFormatLists);
                sources = new HashSet<Integer>();
        }
 
@@ -419,22 +415,6 @@ public class StreamGraph extends StreamingPlan {
                return this.bufferTimeouts.get(vertexID);
        }
 
-       public void addOperatorState(Integer vertexName, String stateName, 
OperatorState<?> state) {
-               Map<String, OperatorState<?>> states = 
operatorStates.get(vertexName);
-               if (states == null) {
-                       states = new HashMap<String, OperatorState<?>>();
-                       states.put(stateName, state);
-               } else {
-                       if (states.containsKey(stateName)) {
-                               throw new RuntimeException("State has already 
been registered with this name: "
-                                               + stateName);
-                       } else {
-                               states.put(stateName, state);
-                       }
-               }
-               operatorStates.put(vertexName, states);
-       }
-
        /**
         * Sets a user defined {@link OutputSelector} for the given operator. 
Used
         * for directed emits.
@@ -594,10 +574,6 @@ public class StreamGraph extends StreamingPlan {
                return outputSelectors.get(vertexID);
        }
 
-       public Map<String, OperatorState<?>> getState(Integer vertexID) {
-               return operatorStates.get(vertexID);
-       }
-
        public Integer getIterationID(Integer vertexID) {
                return iterationIds.get(vertexID);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index b50ac25..ecb6455 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -212,7 +212,6 @@ public class StreamingJobGraphGenerator {
 
                config.setUserInvokable(streamGraph.getInvokable(vertexID));
                
config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));
-               config.setOperatorStates(streamGraph.getState(vertexID));
 
                config.setNumberOfOutputs(nonChainableOutputs.size());
                config.setOutputs(nonChainableOutputs);

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index b0fc364..16284d4 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,16 +17,10 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.runtime.state.OperatorState;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation
@@ -99,43 +93,6 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                return this;
        }
 
-       /**
-        * This is a beta feature </br></br> Register an operator state for this
-        * operator by the given name. This name can be used to retrieve the 
state
-        * during runtime using {@link 
StreamingRuntimeContext#getState(String)}. To
-        * obtain the {@link StreamingRuntimeContext} from the user-defined 
function
-        * use the {@link RichFunction#getRuntimeContext()} method.
-        * 
-        * @param name
-        *            The name of the operator state.
-        * @param state
-        *            The state to be registered for this name.
-        * @return The data stream with state registered.
-        */
-       public SingleOutputStreamOperator<OUT, O> registerState(String name, 
OperatorState<?> state) {
-               streamGraph.addOperatorState(getId(), name, state);
-               return this;
-       }
-
-       /**
-        * This is a beta feature </br></br> Register operator states for this
-        * operator provided in a map. The registered states can be retrieved 
during
-        * runtime using {@link StreamingRuntimeContext#getState(String)}. To 
obtain
-        * the {@link StreamingRuntimeContext} from the user-defined function 
use
-        * the {@link RichFunction#getRuntimeContext()} method.
-        * 
-        * @param states
-        *            The map containing the states that will be registered.
-        * @return The data stream with states registered.
-        */
-       public SingleOutputStreamOperator<OUT, O> registerState(Map<String, 
OperatorState<?>> states) {
-               for (Entry<String, OperatorState<?>> entry : states.entrySet()) 
{
-                       streamGraph.addOperatorState(getId(), entry.getKey(), 
entry.getValue());
-               }
-
-               return this;
-       }
-
        @SuppressWarnings("unchecked")
        public SingleOutputStreamOperator<OUT, O> broadcast() {
                return (SingleOutputStreamOperator<OUT, O>) super.broadcast();

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 2b407c6..b036829 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -84,16 +84,14 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends 
StreamInvokable<IN1, OU
                                next = recordIterator.next(reuse1, reuse2);
                        } catch (IOException e) {
                                if (isRunning) {
-                                       throw new RuntimeException("Could not 
read next record due to: "
-                                                       + 
StringUtils.stringifyException(e));
+                                       throw new RuntimeException("Could not 
read next record.", e);
                                } else {
                                        // Task already cancelled do nothing
                                        next = 0;
                                }
                        } catch (IllegalStateException e) {
                                if (isRunning) {
-                                       throw new RuntimeException("Could not 
read next record due to: "
-                                                       + 
StringUtils.stringifyException(e));
+                                       throw new RuntimeException("Could not 
read next record.", e);
                                } else {
                                        // Task already cancelled do nothing
                                        next = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 5ff47d6..eb0d6ed 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.streamvertex;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.runtime.event.task.TaskEvent;
@@ -64,8 +65,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
        protected ClassLoader userClassLoader;
 
        private EventListener<TaskEvent> superstepListener;
-       
-       private boolean onRecovery;
 
        public StreamVertex() {
                userInvokable = null;
@@ -89,57 +88,38 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
        protected void initialize() {
                this.userClassLoader = getUserCodeClassLoader();
                this.configuration = new StreamConfig(getTaskConfiguration());
-               if(!onRecovery)
-               {
-                       this.states = 
configuration.getOperatorStates(userClassLoader);
-               }
+               this.states = new HashMap<String, OperatorState<?>>();
                this.context = 
createRuntimeContext(getEnvironment().getTaskName(), this.states);
        }
 
-       protected <T> void invokeUserFunction(StreamInvokable<?, T> 
userInvokable) throws Exception {
-               userInvokable.setRuntimeContext(context);
-               userInvokable.open(getTaskConfiguration());
-
-               for (ChainableInvokable<?, ?> invokable : 
outputHandler.chainedInvokables) {
-                       invokable.setRuntimeContext(context);
-                       invokable.open(getTaskConfiguration());
-               }
-
-               userInvokable.invoke();
-               userInvokable.close();
-
-               for (ChainableInvokable<?, ?> invokable : 
outputHandler.chainedInvokables) {
-                       invokable.close();
-               }
-
-       }
-
        @Override
        public void broadcastBarrier(long id) {
-               //Only called at input vertices
+               // Only called at input vertices
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Received barrier from jobmanager: " + id);
                }
                actOnBarrier(id);
        }
 
+       /**
+        * This method is called to confirm that a barrier has been fully 
processed.
+        * It sends an acknowledgment to the jobmanager. In the current version 
if
+        * there is user state it also checkpoints the state to the jobmanager.
+        */
        @Override
        public void confirmBarrier(long barrierID) {
-               
-               if(configuration.getStateMonitoring() && states != null)
-               {
+
+               if (configuration.getStateMonitoring() && !states.isEmpty()) {
                        getEnvironment().getJobManager().tell(
-                                       new 
StateBarrierAck(getEnvironment().getJobID(), 
-                                                       
getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), 
-                                                       barrierID, states), 
ActorRef.noSender());
-               }
-               else
-               {
+                                       new 
StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
+                                                       .getJobVertexId(), 
context.getIndexOfThisSubtask(), barrierID, states),
+                                       ActorRef.noSender());
+               } else {
                        getEnvironment().getJobManager().tell(
                                        new 
BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
-                                                       
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());      
+                                                       
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
                }
-               
+
        }
 
        public void setInputsOutputs() {
@@ -273,10 +253,17 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
                return this.superstepListener;
        }
 
+       /**
+        * Method to be called when a barrier is received from all the input
+        * channels. It should broadcast the barrier to the output operators,
+        * checkpoint the state and send an ack.
+        * 
+        * @param id
+        */
        private void actOnBarrier(long id) {
                try {
                        outputHandler.broadcastBarrier(id);
-                       //TODO checkpoint state here
+                       // TODO checkpoint state here
                        confirmBarrier(id);
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("Superstep " + id + " processed: " + 
StreamVertex.this);
@@ -293,12 +280,13 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
                return configuration.getOperatorName() + " (" + 
context.getIndexOfThisSubtask() + ")";
        }
 
+       /**
+        * Re-injects the user states into the map
+        */
        @Override
-       public void injectStates(Map<String,OperatorState<?>> states) {
-               onRecovery = true;
+       public void injectStates(Map<String, OperatorState<?>> states) {
                this.states.putAll(states);
        }
-       
 
        private class SuperstepEventListener implements 
EventListener<TaskEvent> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 492d2a0..da083fb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.streamvertex;
 import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.configuration.Configuration;
@@ -64,19 +65,35 @@ public class StreamingRuntimeContext extends 
RuntimeUDFContext {
                                throw new RuntimeException("No state has been 
registered for the name: " + name);
                        }
                }
+       }
 
+       /**
+        * Returns whether there is a state stored by the given name
+        */
+       public boolean containsState(String name) {
+               return operatorStates.containsKey(name);
        }
 
+       /**
+        * This is a beta feature </br></br> Register an operator state for this
+        * operator by the given name. This name can be used to retrieve the 
state
+        * during runtime using {@link 
StreamingRuntimeContext#getState(String)}. To
+        * obtain the {@link StreamingRuntimeContext} from the user-defined 
function
+        * use the {@link RichFunction#getRuntimeContext()} method.
+        * 
+        * @param name
+        *            The name of the operator state.
+        * @param state
+        *            The state to be registered for this name.
+        * @return The data stream with state registered.
+        */
        public void registerState(String name, OperatorState<?> state) {
                if (state == null) {
                        throw new RuntimeException("Cannot register null 
state");
                } else {
-                       if(operatorStates.containsKey(name))
-                       {
+                       if (operatorStates.containsKey(name)) {
                                throw new RuntimeException("State is already 
registered");
-                       }
-                       else
-                       {
+                       } else {
                                operatorStates.put(name, state);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
index 7dfccb0..42d4919 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -30,6 +30,15 @@ import 
org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Class encapsulating the functionality that is necessary to sync inputs on
+ * superstep barriers. Once a barrier is received from an input channel, whe
+ * should not process further buffers from that channel until we received the
+ * barrier from all other channels as well. To avoid back-pressuring the
+ * readers, we buffer up the new data received from the blocked channels until
+ * the blocks are released.
+ * 
+ */
 public class BarrierBuffer {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
@@ -54,7 +63,7 @@ public class BarrierBuffer {
        }
 
        /**
-        * Starts the next superstep
+        * Starts the next superstep in the buffer
         * 
         * @param superstep
         *            The next superstep

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 6c91f4d..c32db4e 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -62,8 +62,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 
extends IOReadable
 
        private boolean hasRequestedPartitions;
 
-       private CoBarrierBuffer barrierBuffer1;
-       private CoBarrierBuffer barrierBuffer2;
+       protected CoBarrierBuffer barrierBuffer1;
+       protected CoBarrierBuffer barrierBuffer2;
 
        public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
                super(new UnionInputGate(inputgate1, inputgate2));
@@ -195,7 +195,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
                }
        }
 
-       private int getNextReaderIndexBlocking() throws InterruptedException {
+       protected int getNextReaderIndexBlocking() throws InterruptedException {
 
                Integer nextIndex = 0;
 
@@ -230,6 +230,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
 
        @Override
        public void onEvent(InputGate bufferReader) {
+               addToAvailable(bufferReader);
+       }
+       
+       protected void addToAvailable(InputGate bufferReader){
                if (bufferReader == bufferReader1) {
                        availableRecordReaders.add(1);
                } else if (bufferReader == bufferReader2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
index 203216b..1b4cc36 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -44,8 +44,6 @@ public class BarrierBufferTest {
                input.add(createBuffer(0));
                input.add(createBuffer(0));
                input.add(createBuffer(0));
-               input.add(createBuffer(2));
-               input.add(createBuffer(2));
 
                InputGate mockIG = new MockInputGate(1, input);
                AbstractReader mockAR = new MockReader(mockIG);
@@ -55,8 +53,6 @@ public class BarrierBufferTest {
                assertEquals(input.get(0), bb.getNextNonBlocked());
                assertEquals(input.get(1), bb.getNextNonBlocked());
                assertEquals(input.get(2), bb.getNextNonBlocked());
-               assertEquals(input.get(3), bb.getNextNonBlocked());
-               assertEquals(input.get(4), bb.getNextNonBlocked());
 
        }
 
@@ -136,7 +132,7 @@ public class BarrierBufferTest {
 
        }
 
-       private static class MockInputGate implements InputGate {
+       protected static class MockInputGate implements InputGate {
 
                private int numChannels;
                private Queue<BufferOrEvent> boes;
@@ -175,7 +171,7 @@ public class BarrierBufferTest {
 
        }
 
-       private static class MockReader extends AbstractReader {
+       protected static class MockReader extends AbstractReader {
 
                protected MockReader(InputGate inputGate) {
                        super(inputGate);
@@ -183,11 +179,11 @@ public class BarrierBufferTest {
 
        }
 
-       private static BufferOrEvent createSuperstep(long id, int channel) {
+       protected static BufferOrEvent createSuperstep(long id, int channel) {
                return new BufferOrEvent(new StreamingSuperstep(id), channel);
        }
 
-       private static BufferOrEvent createBuffer(int channel) {
+       protected static BufferOrEvent createBuffer(int channel) {
                return new BufferOrEvent(new Buffer(new MemorySegment(new 
byte[] { 1 }),
                                new BufferRecycler() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
new file mode 100644
index 0000000..1e57d14
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.io.BarrierBufferTest.MockInputGate;
+import org.junit.Test;
+
+public class CoRecordReaderTest {
+
+       @Test
+       public void test() throws InterruptedException, IOException {
+
+               List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
+               input1.add(BarrierBufferTest.createBuffer(0));
+               input1.add(BarrierBufferTest.createSuperstep(1, 0));
+               input1.add(BarrierBufferTest.createBuffer(0));
+
+               InputGate ig1 = new MockInputGate(1, input1);
+
+               List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
+               input2.add(BarrierBufferTest.createBuffer(0));
+               input2.add(BarrierBufferTest.createBuffer(0));
+               input2.add(BarrierBufferTest.createSuperstep(1, 0));
+               input2.add(BarrierBufferTest.createBuffer(0));
+
+               InputGate ig2 = new MockInputGate(1, input2);
+
+               CoRecordReader<?, ?> coReader = new 
CoRecordReader<IOReadableWritable, IOReadableWritable>(
+                               ig1, ig2);
+               BarrierBuffer b1 = coReader.barrierBuffer1;
+               BarrierBuffer b2 = coReader.barrierBuffer2;
+
+               coReader.addToAvailable(ig1);
+               coReader.addToAvailable(ig2);
+               coReader.addToAvailable(ig2);
+               coReader.addToAvailable(ig1);
+
+               assertEquals(1, coReader.getNextReaderIndexBlocking());
+               b1.getNextNonBlocked();
+
+               assertEquals(2, coReader.getNextReaderIndexBlocking());
+               b2.getNextNonBlocked();
+
+               assertEquals(2, coReader.getNextReaderIndexBlocking());
+               b2.getNextNonBlocked();
+
+               assertEquals(1, coReader.getNextReaderIndexBlocking());
+               b1.getNextNonBlocked();
+               b1.processSuperstep(input1.get(1));
+
+               coReader.addToAvailable(ig1);
+               coReader.addToAvailable(ig2);
+               coReader.addToAvailable(ig2);
+
+               assertEquals(2, coReader.getNextReaderIndexBlocking());
+               b2.getNextNonBlocked();
+               b2.processSuperstep(input2.get(2));
+
+               assertEquals(1, coReader.getNextReaderIndexBlocking());
+               b1.getNextNonBlocked();
+
+               assertEquals(2, coReader.getNextReaderIndexBlocking());
+               b2.getNextNonBlocked();
+       }
+
+}

Reply via email to