[FLINK-1638] [streaming] RegisterState removed from datastream + CoRecordReader barrier test added
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/490fa700 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/490fa700 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/490fa700 Branch: refs/heads/master Commit: 490fa700c6310b0f30decd058f97a38955136566 Parents: 37390d6 Author: Gyula Fora <gyf...@apache.org> Authored: Mon Mar 9 09:12:51 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:49 2015 +0100 ---------------------------------------------------------------------- .../kafka/api/simple/PersistentKafkaSource.java | 11 ++- .../flink/streaming/api/StreamConfig.java | 16 ---- .../apache/flink/streaming/api/StreamGraph.java | 26 +----- .../api/StreamingJobGraphGenerator.java | 1 - .../datastream/SingleOutputStreamOperator.java | 43 ---------- .../api/invokable/operator/co/CoInvokable.java | 6 +- .../api/streamvertex/StreamVertex.java | 68 ++++++--------- .../streamvertex/StreamingRuntimeContext.java | 27 ++++-- .../flink/streaming/io/BarrierBuffer.java | 11 ++- .../flink/streaming/io/CoRecordReader.java | 10 ++- .../flink/streaming/io/BarrierBufferTest.java | 12 +-- .../flink/streaming/io/CoRecordReaderTest.java | 90 ++++++++++++++++++++ 12 files changed, 169 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java index fd428c0..0f980e8 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java @@ -45,17 +45,16 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> { this.initialOffset = initialOffset; } + @SuppressWarnings("unchecked") @Override public void open(Configuration parameters) throws InterruptedException { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); - @SuppressWarnings("unchecked") - OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) context.getState("kafka"); - - if (lastKafkaOffSet.getState() == null) { + + if (context.containsState("kafka")) { + kafkaOffSet = (OperatorState<Long>) context.getState("kafka"); + } else { kafkaOffSet = new OperatorState<Long>(initialOffset); context.registerState("kafka", kafkaOffSet); - } else { - kafkaOffSet = lastKafkaOffSet; } super.open(parameters); http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index d813a30..ea19a44 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.api.streamvertex.StreamVertexException; import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.util.InstantiationUtil; public class StreamConfig implements Serializable { @@ -55,7 +54,6 @@ public class StreamConfig implements Serializable { private static final String SERIALIZEDUDF = "serializedudf"; private static final String USER_FUNCTION = "userfunction"; private static final String BUFFER_TIMEOUT = "bufferTimeout"; - private static final String OPERATOR_STATES = "operatorStates"; private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1"; private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2"; private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1"; @@ -331,20 +329,6 @@ public class StreamConfig implements Serializable { return config.getInteger(INPUT_TYPE + inputNumber, 0); } - public void setOperatorStates(Map<String, OperatorState<?>> states) { - config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states)); - } - - @SuppressWarnings("unchecked") - public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) { - try { - return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig( - this.config, OPERATOR_STATES, cl); - } catch (Exception e) { - throw new RuntimeException("Could not load operator state"); - } - } - public void setChainedOutputs(List<Integer> chainedOutputs) { config.setBytes(CHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) chainedOutputs)); http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 8334aa1..f0fbaab 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -46,7 +46,6 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; import org.apache.flink.streaming.api.streamvertex.StreamVertex; import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.runtime.state.OperatorState; import org.apache.sling.commons.json.JSONArray; import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONObject; @@ -85,7 +84,6 @@ public class StreamGraph extends StreamingPlan { private Map<Integer, Integer> iterationIDtoTailID; private Map<Integer, Integer> iterationTailCount; private Map<Integer, Long> iterationTimeouts; - private Map<Integer, Map<String, OperatorState<?>>> operatorStates; private Map<Integer, InputFormat<String, ?>> inputFormatLists; private List<Map<Integer, ?>> containingMaps; @@ -149,10 +147,8 @@ public class StreamGraph extends StreamingPlan { containingMaps.add(iterationTailCount); iterationTimeouts = new HashMap<Integer, Long>(); containingMaps.add(iterationTailCount); - operatorStates = new HashMap<Integer, Map<String, OperatorState<?>>>(); - containingMaps.add(operatorStates); inputFormatLists = new HashMap<Integer, InputFormat<String, ?>>(); - containingMaps.add(operatorStates); + containingMaps.add(inputFormatLists); sources = new HashSet<Integer>(); } @@ -419,22 +415,6 @@ public class StreamGraph extends StreamingPlan { return this.bufferTimeouts.get(vertexID); } - public void addOperatorState(Integer vertexName, String stateName, OperatorState<?> state) { - Map<String, OperatorState<?>> states = operatorStates.get(vertexName); - if (states == null) { - states = new HashMap<String, OperatorState<?>>(); - states.put(stateName, state); - } else { - if (states.containsKey(stateName)) { - throw new RuntimeException("State has already been registered with this name: " - + stateName); - } else { - states.put(stateName, state); - } - } - operatorStates.put(vertexName, states); - } - /** * Sets a user defined {@link OutputSelector} for the given operator. Used * for directed emits. @@ -594,10 +574,6 @@ public class StreamGraph extends StreamingPlan { return outputSelectors.get(vertexID); } - public Map<String, OperatorState<?>> getState(Integer vertexID) { - return operatorStates.get(vertexID); - } - public Integer getIterationID(Integer vertexID) { return iterationIds.get(vertexID); } http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index b50ac25..ecb6455 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -212,7 +212,6 @@ public class StreamingJobGraphGenerator { config.setUserInvokable(streamGraph.getInvokable(vertexID)); config.setOutputSelectors(streamGraph.getOutputSelector(vertexID)); - config.setOperatorStates(streamGraph.getState(vertexID)); config.setNumberOfOutputs(nonChainableOutputs.size()); config.setOutputs(nonChainableOutputs); http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index b0fc364..16284d4 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -17,16 +17,10 @@ package org.apache.flink.streaming.api.datastream; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; -import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; -import org.apache.flink.runtime.state.OperatorState; /** * The SingleOutputStreamOperator represents a user defined transformation @@ -99,43 +93,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato return this; } - /** - * This is a beta feature </br></br> Register an operator state for this - * operator by the given name. This name can be used to retrieve the state - * during runtime using {@link StreamingRuntimeContext#getState(String)}. To - * obtain the {@link StreamingRuntimeContext} from the user-defined function - * use the {@link RichFunction#getRuntimeContext()} method. - * - * @param name - * The name of the operator state. - * @param state - * The state to be registered for this name. - * @return The data stream with state registered. - */ - public SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) { - streamGraph.addOperatorState(getId(), name, state); - return this; - } - - /** - * This is a beta feature </br></br> Register operator states for this - * operator provided in a map. The registered states can be retrieved during - * runtime using {@link StreamingRuntimeContext#getState(String)}. To obtain - * the {@link StreamingRuntimeContext} from the user-defined function use - * the {@link RichFunction#getRuntimeContext()} method. - * - * @param states - * The map containing the states that will be registered. - * @return The data stream with states registered. - */ - public SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) { - for (Entry<String, OperatorState<?>> entry : states.entrySet()) { - streamGraph.addOperatorState(getId(), entry.getKey(), entry.getValue()); - } - - return this; - } - @SuppressWarnings("unchecked") public SingleOutputStreamOperator<OUT, O> broadcast() { return (SingleOutputStreamOperator<OUT, O>) super.broadcast(); http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java index 2b407c6..b036829 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java @@ -84,16 +84,14 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU next = recordIterator.next(reuse1, reuse2); } catch (IOException e) { if (isRunning) { - throw new RuntimeException("Could not read next record due to: " - + StringUtils.stringifyException(e)); + throw new RuntimeException("Could not read next record.", e); } else { // Task already cancelled do nothing next = 0; } } catch (IllegalStateException e) { if (isRunning) { - throw new RuntimeException("Could not read next record due to: " - + StringUtils.stringifyException(e)); + throw new RuntimeException("Could not read next record.", e); } else { // Task already cancelled do nothing next = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 5ff47d6..eb0d6ed 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.streamvertex; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import org.apache.flink.runtime.event.task.TaskEvent; @@ -64,8 +65,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected ClassLoader userClassLoader; private EventListener<TaskEvent> superstepListener; - - private boolean onRecovery; public StreamVertex() { userInvokable = null; @@ -89,57 +88,38 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected void initialize() { this.userClassLoader = getUserCodeClassLoader(); this.configuration = new StreamConfig(getTaskConfiguration()); - if(!onRecovery) - { - this.states = configuration.getOperatorStates(userClassLoader); - } + this.states = new HashMap<String, OperatorState<?>>(); this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); } - protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception { - userInvokable.setRuntimeContext(context); - userInvokable.open(getTaskConfiguration()); - - for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) { - invokable.setRuntimeContext(context); - invokable.open(getTaskConfiguration()); - } - - userInvokable.invoke(); - userInvokable.close(); - - for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) { - invokable.close(); - } - - } - @Override public void broadcastBarrier(long id) { - //Only called at input vertices + // Only called at input vertices if (LOG.isDebugEnabled()) { LOG.debug("Received barrier from jobmanager: " + id); } actOnBarrier(id); } + /** + * This method is called to confirm that a barrier has been fully processed. + * It sends an acknowledgment to the jobmanager. In the current version if + * there is user state it also checkpoints the state to the jobmanager. + */ @Override public void confirmBarrier(long barrierID) { - - if(configuration.getStateMonitoring() && states != null) - { + + if (configuration.getStateMonitoring() && !states.isEmpty()) { getEnvironment().getJobManager().tell( - new StateBarrierAck(getEnvironment().getJobID(), - getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), - barrierID, states), ActorRef.noSender()); - } - else - { + new StateBarrierAck(getEnvironment().getJobID(), getEnvironment() + .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, states), + ActorRef.noSender()); + } else { getEnvironment().getJobManager().tell( new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), - context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender()); + context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender()); } - + } public void setInputsOutputs() { @@ -273,10 +253,17 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa return this.superstepListener; } + /** + * Method to be called when a barrier is received from all the input + * channels. It should broadcast the barrier to the output operators, + * checkpoint the state and send an ack. + * + * @param id + */ private void actOnBarrier(long id) { try { outputHandler.broadcastBarrier(id); - //TODO checkpoint state here + // TODO checkpoint state here confirmBarrier(id); if (LOG.isDebugEnabled()) { LOG.debug("Superstep " + id + " processed: " + StreamVertex.this); @@ -293,12 +280,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")"; } + /** + * Re-injects the user states into the map + */ @Override - public void injectStates(Map<String,OperatorState<?>> states) { - onRecovery = true; + public void injectStates(Map<String, OperatorState<?>> states) { this.states.putAll(states); } - private class SuperstepEventListener implements EventListener<TaskEvent> { http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java index 492d2a0..da083fb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.streamvertex; import java.util.Map; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.configuration.Configuration; @@ -64,19 +65,35 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { throw new RuntimeException("No state has been registered for the name: " + name); } } + } + /** + * Returns whether there is a state stored by the given name + */ + public boolean containsState(String name) { + return operatorStates.containsKey(name); } + /** + * This is a beta feature </br></br> Register an operator state for this + * operator by the given name. This name can be used to retrieve the state + * during runtime using {@link StreamingRuntimeContext#getState(String)}. To + * obtain the {@link StreamingRuntimeContext} from the user-defined function + * use the {@link RichFunction#getRuntimeContext()} method. + * + * @param name + * The name of the operator state. + * @param state + * The state to be registered for this name. + * @return The data stream with state registered. + */ public void registerState(String name, OperatorState<?> state) { if (state == null) { throw new RuntimeException("Cannot register null state"); } else { - if(operatorStates.containsKey(name)) - { + if (operatorStates.containsKey(name)) { throw new RuntimeException("State is already registered"); - } - else - { + } else { operatorStates.put(name, state); } } http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java index 7dfccb0..42d4919 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java @@ -30,6 +30,15 @@ import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Class encapsulating the functionality that is necessary to sync inputs on + * superstep barriers. Once a barrier is received from an input channel, whe + * should not process further buffers from that channel until we received the + * barrier from all other channels as well. To avoid back-pressuring the + * readers, we buffer up the new data received from the blocked channels until + * the blocks are released. + * + */ public class BarrierBuffer { private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); @@ -54,7 +63,7 @@ public class BarrierBuffer { } /** - * Starts the next superstep + * Starts the next superstep in the buffer * * @param superstep * The next superstep http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java index 6c91f4d..c32db4e 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java @@ -62,8 +62,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable private boolean hasRequestedPartitions; - private CoBarrierBuffer barrierBuffer1; - private CoBarrierBuffer barrierBuffer2; + protected CoBarrierBuffer barrierBuffer1; + protected CoBarrierBuffer barrierBuffer2; public CoRecordReader(InputGate inputgate1, InputGate inputgate2) { super(new UnionInputGate(inputgate1, inputgate2)); @@ -195,7 +195,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable } } - private int getNextReaderIndexBlocking() throws InterruptedException { + protected int getNextReaderIndexBlocking() throws InterruptedException { Integer nextIndex = 0; @@ -230,6 +230,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable @Override public void onEvent(InputGate bufferReader) { + addToAvailable(bufferReader); + } + + protected void addToAvailable(InputGate bufferReader){ if (bufferReader == bufferReader1) { availableRecordReaders.add(1); } else if (bufferReader == bufferReader2) { http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java index 203216b..1b4cc36 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java @@ -44,8 +44,6 @@ public class BarrierBufferTest { input.add(createBuffer(0)); input.add(createBuffer(0)); input.add(createBuffer(0)); - input.add(createBuffer(2)); - input.add(createBuffer(2)); InputGate mockIG = new MockInputGate(1, input); AbstractReader mockAR = new MockReader(mockIG); @@ -55,8 +53,6 @@ public class BarrierBufferTest { assertEquals(input.get(0), bb.getNextNonBlocked()); assertEquals(input.get(1), bb.getNextNonBlocked()); assertEquals(input.get(2), bb.getNextNonBlocked()); - assertEquals(input.get(3), bb.getNextNonBlocked()); - assertEquals(input.get(4), bb.getNextNonBlocked()); } @@ -136,7 +132,7 @@ public class BarrierBufferTest { } - private static class MockInputGate implements InputGate { + protected static class MockInputGate implements InputGate { private int numChannels; private Queue<BufferOrEvent> boes; @@ -175,7 +171,7 @@ public class BarrierBufferTest { } - private static class MockReader extends AbstractReader { + protected static class MockReader extends AbstractReader { protected MockReader(InputGate inputGate) { super(inputGate); @@ -183,11 +179,11 @@ public class BarrierBufferTest { } - private static BufferOrEvent createSuperstep(long id, int channel) { + protected static BufferOrEvent createSuperstep(long id, int channel) { return new BufferOrEvent(new StreamingSuperstep(id), channel); } - private static BufferOrEvent createBuffer(int channel) { + protected static BufferOrEvent createBuffer(int channel) { return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }), new BufferRecycler() { http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java new file mode 100644 index 0000000..1e57d14 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.io; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.streaming.io.BarrierBufferTest.MockInputGate; +import org.junit.Test; + +public class CoRecordReaderTest { + + @Test + public void test() throws InterruptedException, IOException { + + List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>(); + input1.add(BarrierBufferTest.createBuffer(0)); + input1.add(BarrierBufferTest.createSuperstep(1, 0)); + input1.add(BarrierBufferTest.createBuffer(0)); + + InputGate ig1 = new MockInputGate(1, input1); + + List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>(); + input2.add(BarrierBufferTest.createBuffer(0)); + input2.add(BarrierBufferTest.createBuffer(0)); + input2.add(BarrierBufferTest.createSuperstep(1, 0)); + input2.add(BarrierBufferTest.createBuffer(0)); + + InputGate ig2 = new MockInputGate(1, input2); + + CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>( + ig1, ig2); + BarrierBuffer b1 = coReader.barrierBuffer1; + BarrierBuffer b2 = coReader.barrierBuffer2; + + coReader.addToAvailable(ig1); + coReader.addToAvailable(ig2); + coReader.addToAvailable(ig2); + coReader.addToAvailable(ig1); + + assertEquals(1, coReader.getNextReaderIndexBlocking()); + b1.getNextNonBlocked(); + + assertEquals(2, coReader.getNextReaderIndexBlocking()); + b2.getNextNonBlocked(); + + assertEquals(2, coReader.getNextReaderIndexBlocking()); + b2.getNextNonBlocked(); + + assertEquals(1, coReader.getNextReaderIndexBlocking()); + b1.getNextNonBlocked(); + b1.processSuperstep(input1.get(1)); + + coReader.addToAvailable(ig1); + coReader.addToAvailable(ig2); + coReader.addToAvailable(ig2); + + assertEquals(2, coReader.getNextReaderIndexBlocking()); + b2.getNextNonBlocked(); + b2.processSuperstep(input2.get(2)); + + assertEquals(1, coReader.getNextReaderIndexBlocking()); + b1.getNextNonBlocked(); + + assertEquals(2, coReader.getNextReaderIndexBlocking()); + b2.getNextNonBlocked(); + } + +}