[FLINK-1638] [streaming] Barrier sync added to CoRecordReader, barrier tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5327d56d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5327d56d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5327d56d Branch: refs/heads/master Commit: 5327d56dc6f6f49a07054d89efcf30c894c85eca Parents: c9a3992 Author: Gyula Fora <gyf...@apache.org> Authored: Thu Mar 5 22:04:49 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:49 2015 +0100 ---------------------------------------------------------------------- .../io/network/api/reader/BarrierBuffer.java | 143 ------------- .../reader/StreamingAbstractRecordReader.java | 122 ----------- .../connectors/kafka/api/KafkaSource.java | 5 +- .../api/invokable/operator/co/CoInvokable.java | 11 +- .../flink/streaming/io/BarrierBuffer.java | 155 ++++++++++++++ .../flink/streaming/io/CoRecordReader.java | 108 ++++++++-- .../io/StreamingAbstractRecordReader.java | 123 ++++++++++++ .../io/StreamingMutableRecordReader.java | 1 - .../streaming/state/PartitionableState.java | 8 +- .../streaming/api/WindowCrossJoinTest.java | 4 +- .../flink/streaming/io/BarrierBufferTest.java | 200 +++++++++++++++++++ 11 files changed, 589 insertions(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java deleted file mode 100644 index ee317cd..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.reader; - -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Set; - -import org.apache.flink.runtime.event.task.StreamingSuperstep; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BarrierBuffer { - - private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); - - private Queue<BufferOrEvent> bufferOrEvents = new LinkedList<BufferOrEvent>(); - private Queue<BufferOrEvent> unprocessed = new LinkedList<BufferOrEvent>(); - - private Set<Integer> blockedChannels = new HashSet<Integer>(); - private int totalNumberOfInputChannels; - - private StreamingSuperstep currentSuperstep; - private boolean receivedSuperstep; - - private boolean blockAll = false; - - private AbstractReader reader; - - private InputGate inputGate; - - public BarrierBuffer(InputGate inputGate, AbstractReader reader) { - this.inputGate = inputGate; - totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); - this.reader = reader; - } - - private void startSuperstep(StreamingSuperstep superstep) { - this.currentSuperstep = superstep; - this.receivedSuperstep = true; - if (LOG.isDebugEnabled()) { - LOG.debug("Superstep started with id: " + superstep.getId()); - } - } - - private void store(BufferOrEvent bufferOrEvent) { - bufferOrEvents.add(bufferOrEvent); - } - - private BufferOrEvent getNonProcessed() { - return unprocessed.poll(); - } - - private boolean isBlocked(int channelIndex) { - return blockAll || blockedChannels.contains(channelIndex); - } - - private boolean containsNonprocessed() { - return !unprocessed.isEmpty(); - } - - private boolean receivedSuperstep() { - return receivedSuperstep; - } - - public BufferOrEvent getNextNonBlocked() throws IOException, - InterruptedException { - BufferOrEvent bufferOrEvent = null; - - if (containsNonprocessed()) { - bufferOrEvent = getNonProcessed(); - } else { - while (bufferOrEvent == null) { - BufferOrEvent nextBufferOrEvent = inputGate.getNextBufferOrEvent(); - if (isBlocked(nextBufferOrEvent.getChannelIndex())) { - store(nextBufferOrEvent); - } else { - bufferOrEvent = nextBufferOrEvent; - } - } - } - return bufferOrEvent; - } - - private void blockChannel(int channelIndex) { - if (!blockedChannels.contains(channelIndex)) { - blockedChannels.add(channelIndex); - if (LOG.isDebugEnabled()) { - LOG.debug("Channel blocked with index: " + channelIndex); - } - if (blockedChannels.size() == totalNumberOfInputChannels) { - reader.publish(currentSuperstep); - unprocessed.addAll(bufferOrEvents); - bufferOrEvents.clear(); - blockedChannels.clear(); - receivedSuperstep = false; - if (LOG.isDebugEnabled()) { - LOG.debug("All barriers received, blocks released"); - } - } - - } else { - throw new RuntimeException("Tried to block an already blocked channel"); - } - } - - public String toString() { - return blockedChannels.toString(); - } - - public void processSuperstep(BufferOrEvent bufferOrEvent) { - int channelIndex = bufferOrEvent.getChannelIndex(); - if (isBlocked(channelIndex)) { - store(bufferOrEvent); - } else { - StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent(); - if (!receivedSuperstep()) { - startSuperstep(superstep); - } - blockChannel(channelIndex); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java deleted file mode 100644 index ea2d7a6..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.reader; - -import java.io.IOException; - - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.event.task.StreamingSuperstep; -import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A record-oriented reader. - * <p> - * This abstract base class is used by both the mutable and immutable record - * readers. - * - * @param <T> - * The type of the record that can be read with this record reader. - */ -public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements - ReaderBase { - - private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class); - - private final RecordDeserializer<T>[] recordDeserializers; - - private RecordDeserializer<T> currentRecordDeserializer; - - private boolean isFinished; - - private final BarrierBuffer barrierBuffer; - - protected StreamingAbstractRecordReader(InputGate inputGate) { - super(inputGate); - barrierBuffer = new BarrierBuffer(inputGate, this); - - // Initialize one deserializer per input channel - this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate - .getNumberOfInputChannels()]; - for (int i = 0; i < recordDeserializers.length; i++) { - recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(); - } - } - - protected boolean getNextRecord(T target) throws IOException, InterruptedException { - if (isFinished) { - return false; - } - - while (true) { - if (currentRecordDeserializer != null) { - DeserializationResult result = currentRecordDeserializer.getNextRecord(target); - - if (result.isBufferConsumed()) { - currentRecordDeserializer.getCurrentBuffer().recycle(); - currentRecordDeserializer = null; - } - - if (result.isFullRecord()) { - return true; - } - } - - final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); - - if (bufferOrEvent.isBuffer()) { - currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()]; - currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); - } else { - // Event received - final AbstractEvent event = bufferOrEvent.getEvent(); - - if (event instanceof StreamingSuperstep) { - barrierBuffer.processSuperstep(bufferOrEvent); - } else { - if (handleEvent(event)) { - if (inputGate.isFinished()) { - isFinished = true; - return false; - } else if (hasReachedEndOfSuperstep()) { - return false; - } // else: More data is coming... - } - } - } - } - } - - public void clearBuffers() { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - Buffer buffer = deserializer.getCurrentBuffer(); - if (buffer != null && !buffer.isRecycled()) { - buffer.recycle(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index 4349081..0c6cd4a 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -78,8 +78,9 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> { } public KafkaSource(String zookeeperHost, String topicId, - DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){ - this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME); + DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) { + this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, + ZOOKEEPER_DEFAULT_SYNC_TIME); } public KafkaSource(String zookeeperHost, String topicId, http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java index b41dbbb..2b407c6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java @@ -84,7 +84,16 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU next = recordIterator.next(reuse1, reuse2); } catch (IOException e) { if (isRunning) { - throw e; + throw new RuntimeException("Could not read next record due to: " + + StringUtils.stringifyException(e)); + } else { + // Task already cancelled do nothing + next = 0; + } + } catch (IllegalStateException e) { + if (isRunning) { + throw new RuntimeException("Could not read next record due to: " + + StringUtils.stringifyException(e)); } else { // Task already cancelled do nothing next = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java new file mode 100644 index 0000000..3ff718a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.io; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + +import org.apache.flink.runtime.event.task.StreamingSuperstep; +import org.apache.flink.runtime.io.network.api.reader.AbstractReader; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BarrierBuffer { + + private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); + + private Queue<BufferOrEvent> nonprocessed = new LinkedList<BufferOrEvent>(); + private Queue<BufferOrEvent> blockedNonprocessed = new LinkedList<BufferOrEvent>(); + + private Set<Integer> blockedChannels = new HashSet<Integer>(); + private int totalNumberOfInputChannels; + + private StreamingSuperstep currentSuperstep; + private boolean superstepStarted; + + private AbstractReader reader; + + private InputGate inputGate; + + public BarrierBuffer(InputGate inputGate, AbstractReader reader) { + this.inputGate = inputGate; + totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); + this.reader = reader; + } + + protected void startSuperstep(StreamingSuperstep superstep) { + this.currentSuperstep = superstep; + this.superstepStarted = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Superstep started with id: " + superstep.getId()); + } + } + + protected void store(BufferOrEvent bufferOrEvent) { + nonprocessed.add(bufferOrEvent); + } + + protected BufferOrEvent getNonProcessed() { + BufferOrEvent nextNonprocessed = null; + while (nextNonprocessed == null && !nonprocessed.isEmpty()) { + nextNonprocessed = nonprocessed.poll(); + if (isBlocked(nextNonprocessed.getChannelIndex())) { + blockedNonprocessed.add(nextNonprocessed); + nextNonprocessed = null; + } + } + return nextNonprocessed; + } + + protected boolean isBlocked(int channelIndex) { + return blockedChannels.contains(channelIndex); + } + + protected boolean isAllBlocked() { + return blockedChannels.size() == totalNumberOfInputChannels; + } + + public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException { + // If there are non-processed buffers from the previously blocked ones, + // we get the next + BufferOrEvent bufferOrEvent = getNonProcessed(); + + if (bufferOrEvent != null) { + return bufferOrEvent; + } else { + // If no non-processed, get new from input + while (true) { + // We read the next buffer from the inputgate + bufferOrEvent = inputGate.getNextBufferOrEvent(); + if (isBlocked(bufferOrEvent.getChannelIndex())) { + // If channel blocked we just store it + store(bufferOrEvent); + } else { + return bufferOrEvent; + } + } + } + } + + protected void blockChannel(int channelIndex) { + if (!blockedChannels.contains(channelIndex)) { + blockedChannels.add(channelIndex); + if (LOG.isDebugEnabled()) { + LOG.debug("Channel blocked with index: " + channelIndex); + } + if (isAllBlocked()) { + actOnAllBlocked(); + } + + } else { + throw new RuntimeException("Tried to block an already blocked channel"); + } + } + + protected void releaseBlocks() { + nonprocessed.addAll(blockedNonprocessed); + blockedChannels.clear(); + blockedNonprocessed.clear(); + superstepStarted = false; + if (LOG.isDebugEnabled()) { + LOG.debug("All barriers received, blocks released"); + } + } + + protected void actOnAllBlocked() { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing barrier to the vertex"); + } + reader.publish(currentSuperstep); + releaseBlocks(); + } + + public String toString() { + return blockedChannels.toString(); + } + + public void processSuperstep(BufferOrEvent bufferOrEvent) { + StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent(); + if (!superstepStarted) { + startSuperstep(superstep); + } + blockChannel(bufferOrEvent.getChannelIndex()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java index 79f09c4..6a1f624 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java @@ -18,10 +18,12 @@ package org.apache.flink.streaming.io; import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; @@ -44,7 +46,9 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable private final InputGate bufferReader2; - private final BlockingQueue<Integer> availableRecordReaders = new LinkedBlockingQueue<Integer>(); + private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>(); + + private LinkedList<Integer> processed = new LinkedList<Integer>(); private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers; @@ -59,15 +63,20 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable private boolean hasRequestedPartitions; - public CoRecordReader(InputGate bufferReader1, InputGate bufferReader2) { - super(new UnionInputGate(bufferReader1, bufferReader2)); + private CoBarrierBuffer barrierBuffer1; + private CoBarrierBuffer barrierBuffer2; + + private Queue<Integer> unprocessedIndices = new LinkedList<Integer>(); + + public CoRecordReader(InputGate inputgate1, InputGate inputgate2) { + super(new UnionInputGate(inputgate1, inputgate2)); - this.bufferReader1 = bufferReader1; - this.bufferReader2 = bufferReader2; + this.bufferReader1 = inputgate1; + this.bufferReader2 = inputgate2; - this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader1 + this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1 .getNumberOfInputChannels()]; - this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader2 + this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2 .getNumberOfInputChannels()]; for (int i = 0; i < reader1RecordDeserializers.length; i++) { @@ -78,8 +87,14 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>(); } - bufferReader1.registerListener(this); - bufferReader2.registerListener(this); + inputgate1.registerListener(this); + inputgate2.registerListener(this); + + barrierBuffer1 = new CoBarrierBuffer(inputgate1, this); + barrierBuffer2 = new CoBarrierBuffer(inputgate2, this); + + barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2); + barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1); } public void requestPartitionsOnce() throws IOException, InterruptedException { @@ -94,15 +109,16 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable @SuppressWarnings("unchecked") protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException { - requestPartitionsOnce(); + requestPartitionsOnce(); while (true) { if (currentReaderIndex == 0) { if ((bufferReader1.isFinished() && bufferReader2.isFinished())) { return 0; } - + currentReaderIndex = getNextReaderIndexBlocking(); + } if (currentReaderIndex == 1) { @@ -123,12 +139,17 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable } } else { - final BufferOrEvent boe = bufferReader1.getNextBufferOrEvent(); + final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked(); if (boe.isBuffer()) { reader1currentRecordDeserializer = reader1RecordDeserializers[boe .getChannelIndex()]; reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer()); + } else if (boe.getEvent() instanceof StreamingSuperstep) { + barrierBuffer1.processSuperstep(boe); + currentReaderIndex = 0; + + break; } else if (handleEvent(boe.getEvent())) { currentReaderIndex = 0; @@ -153,12 +174,17 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable return 2; } } else { - final BufferOrEvent boe = bufferReader2.getNextBufferOrEvent(); + final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked(); if (boe.isBuffer()) { reader2currentRecordDeserializer = reader2RecordDeserializers[boe .getChannelIndex()]; reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer()); + } else if (boe.getEvent() instanceof StreamingSuperstep) { + barrierBuffer2.processSuperstep(boe); + currentReaderIndex = 0; + + break; } else if (handleEvent(boe.getEvent())) { currentReaderIndex = 0; @@ -173,7 +199,32 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable } private int getNextReaderIndexBlocking() throws InterruptedException { - return availableRecordReaders.take(); + + Integer nextIndex = 0; + + while (processed.contains(nextIndex = availableRecordReaders.take())) { + processed.remove(nextIndex); + } + + if (nextIndex == 1) { + if (barrierBuffer1.isAllBlocked()) { + availableRecordReaders.addFirst(1); + processed.add(2); + return 2; + } else { + return 1; + } + } else { + if (barrierBuffer2.isAllBlocked()) { + availableRecordReaders.addFirst(2); + processed.add(1); + return 1; + } else { + return 2; + } + + } + } // ------------------------------------------------------------------------ @@ -183,8 +234,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable @Override public void onEvent(InputGate bufferReader) { if (bufferReader == bufferReader1) { + System.out.println("Added 1"); availableRecordReaders.add(1); } else if (bufferReader == bufferReader2) { + System.out.println("Added 2"); availableRecordReaders.add(2); } } @@ -203,4 +256,27 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable } } } + + private class CoBarrierBuffer extends BarrierBuffer { + + private CoBarrierBuffer otherBuffer; + + public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) { + super(inputGate, reader); + } + + public void setOtherBarrierBuffer(CoBarrierBuffer other) { + this.otherBuffer = other; + } + + @Override + protected void actOnAllBlocked() { + if (otherBuffer.isAllBlocked()) { + super.actOnAllBlocked(); + otherBuffer.releaseBlocks(); + } + } + + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java new file mode 100644 index 0000000..811c48a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.io; + +import java.io.IOException; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.task.StreamingSuperstep; +import org.apache.flink.runtime.io.network.api.reader.AbstractReader; +import org.apache.flink.runtime.io.network.api.reader.ReaderBase; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A record-oriented reader. + * <p> + * This abstract base class is used by both the mutable and immutable record + * readers. + * + * @param <T> + * The type of the record that can be read with this record reader. + */ +public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements + ReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class); + + private final RecordDeserializer<T>[] recordDeserializers; + + private RecordDeserializer<T> currentRecordDeserializer; + + private boolean isFinished; + + private final BarrierBuffer barrierBuffer; + + protected StreamingAbstractRecordReader(InputGate inputGate) { + super(inputGate); + barrierBuffer = new BarrierBuffer(inputGate, this); + + // Initialize one deserializer per input channel + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate + .getNumberOfInputChannels()]; + for (int i = 0; i < recordDeserializers.length; i++) { + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(); + } + } + + protected boolean getNextRecord(T target) throws IOException, InterruptedException { + if (isFinished) { + return false; + } + + while (true) { + if (currentRecordDeserializer != null) { + DeserializationResult result = currentRecordDeserializer.getNextRecord(target); + + if (result.isBufferConsumed()) { + currentRecordDeserializer.getCurrentBuffer().recycle(); + currentRecordDeserializer = null; + } + + if (result.isFullRecord()) { + return true; + } + } + + final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); + + if (bufferOrEvent.isBuffer()) { + currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()]; + currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); + } else { + // Event received + final AbstractEvent event = bufferOrEvent.getEvent(); + + if (event instanceof StreamingSuperstep) { + barrierBuffer.processSuperstep(bufferOrEvent); + } else { + if (handleEvent(event)) { + if (inputGate.isFinished()) { + isFinished = true; + return false; + } else if (hasReachedEndOfSuperstep()) { + return false; + } // else: More data is coming... + } + } + } + } + } + + public void clearBuffers() { + for (RecordDeserializer<?> deserializer : recordDeserializers) { + Buffer buffer = deserializer.getCurrentBuffer(); + if (buffer != null && !buffer.isRecycled()) { + buffer.recycle(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java index ffa436b..e868879 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.io; import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.reader.StreamingAbstractRecordReader; import org.apache.flink.runtime.io.network.api.reader.MutableReader; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java index a5e67ab..1c67c9e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java @@ -22,18 +22,18 @@ import org.apache.flink.runtime.state.OperatorState; /** * Base class for representing operator states that can be repartitioned for * state state and load balancing. - * + * * @param <T> * The type of the operator state. */ public abstract class PartitionableState<T> extends OperatorState<T> { - public PartitionableState(T initialState) { + private static final long serialVersionUID = 1L; + + PartitionableState(T initialState) { super(initialState); } - private static final long serialVersionUID = 1L; - /** * Repartitions(divides) the current state into the given number of new * partitions. The created partitions will be used to redistribute then http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index e14e281..bd97917 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -143,7 +143,7 @@ public class WindowCrossJoinTest implements Serializable { public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) { joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0)); } - + @Override public void cancel() { } @@ -157,7 +157,7 @@ public class WindowCrossJoinTest implements Serializable { public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) { crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0)); } - + @Override public void cancel() { } http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java new file mode 100644 index 0000000..e7a03d9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.io; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.event.task.StreamingSuperstep; +import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.io.network.api.reader.AbstractReader; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.util.event.EventListener; +import org.junit.Test; + +public class BarrierBufferTest { + + @Test + public void testWithoutBarriers() throws IOException, InterruptedException { + + List<BufferOrEvent> input = new LinkedList<BufferOrEvent>(); + input.add(createBuffer(0)); + input.add(createBuffer(0)); + input.add(createBuffer(0)); + input.add(createBuffer(2)); + input.add(createBuffer(2)); + + InputGate mockIG = new MockInputGate(1, input); + AbstractReader mockAR = new MockReader(mockIG); + + BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR); + + assertEquals(input.get(0), bb.getNextNonBlocked()); + assertEquals(input.get(1), bb.getNextNonBlocked()); + assertEquals(input.get(2), bb.getNextNonBlocked()); + assertEquals(input.get(3), bb.getNextNonBlocked()); + assertEquals(input.get(4), bb.getNextNonBlocked()); + + } + + @Test + public void testOneChannelBarrier() throws IOException, InterruptedException { + + List<BufferOrEvent> input = new LinkedList<BufferOrEvent>(); + input.add(createBuffer(0)); + input.add(createBuffer(0)); + input.add(createSuperstep(1, 0)); + input.add(createBuffer(0)); + input.add(createBuffer(0)); + input.add(createSuperstep(2, 0)); + input.add(createBuffer(0)); + + InputGate mockIG = new MockInputGate(1, input); + AbstractReader mockAR = new MockReader(mockIG); + + BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR); + BufferOrEvent nextBoe; + + assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked()); + + } + + @Test + public void testMultiChannelBarrier() throws IOException, InterruptedException { + + List<BufferOrEvent> input = new LinkedList<BufferOrEvent>(); + input.add(createBuffer(0)); + input.add(createBuffer(1)); + input.add(createSuperstep(1, 0)); + input.add(createSuperstep(2, 0)); + input.add(createBuffer(0)); + input.add(createSuperstep(3, 0)); + input.add(createBuffer(0)); + input.add(createBuffer(1)); + input.add(createSuperstep(1, 1)); + input.add(createBuffer(0)); + input.add(createBuffer(1)); + input.add(createSuperstep(2, 1)); + input.add(createSuperstep(3, 1)); + + InputGate mockIG1 = new MockInputGate(2, input); + AbstractReader mockAR1 = new MockReader(mockIG1); + + BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1); + BufferOrEvent nextBoe; + + assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + assertEquals(input.get(7), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(8), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + assertEquals(input.get(10), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(11), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + assertEquals(input.get(12), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked()); + assertEquals(input.get(9), nextBoe = bb.getNextNonBlocked()); + + } + + private static class MockInputGate implements InputGate { + + private int numChannels; + private Queue<BufferOrEvent> boes; + + public MockInputGate(int numChannels, List<BufferOrEvent> boes) { + this.numChannels = numChannels; + this.boes = new LinkedList<BufferOrEvent>(boes); + } + + @Override + public int getNumberOfInputChannels() { + return numChannels; + } + + @Override + public boolean isFinished() { + return boes.isEmpty(); + } + + @Override + public void requestPartitions() throws IOException, InterruptedException { + } + + @Override + public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { + return boes.remove(); + } + + @Override + public void sendTaskEvent(TaskEvent event) throws IOException { + } + + @Override + public void registerListener(EventListener<InputGate> listener) { + } + + } + + private static class MockReader extends AbstractReader { + + protected MockReader(InputGate inputGate) { + super(inputGate); + } + + } + + private static BufferOrEvent createSuperstep(long id, int channel) { + return new BufferOrEvent(new StreamingSuperstep(id), channel); + } + + private static BufferOrEvent createBuffer(int channel) { + return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }), + new BufferRecycler() { + + @Override + public void recycle(MemorySegment memorySegment) { + } + }), channel); + } + +}