[FLINK-1638] [streaming] Seperated AbstractRecordReader for streaming and batch
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9a39926 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9a39926 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9a39926 Branch: refs/heads/master Commit: c9a399268309768738e65af3c52525560b85cd0c Parents: cf49ebb Author: Gyula Fora <gyf...@apache.org> Authored: Thu Mar 5 19:55:14 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:49 2015 +0100 ---------------------------------------------------------------------- .../api/reader/AbstractRecordReader.java | 55 +++------ .../reader/StreamingAbstractRecordReader.java | 122 +++++++++++++++++++ .../streaming/io/IndexedMutableReader.java | 4 +- .../io/StreamingMutableRecordReader.java | 44 +++++++ 4 files changed, 186 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index 920792c..e70b6ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -18,34 +18,24 @@ package org.apache.flink.runtime.io.network.api.reader; -import java.io.IOException; - - import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.IOException; /** * A record-oriented reader. * <p> - * This abstract base class is used by both the mutable and immutable record - * readers. - * - * @param <T> - * The type of the record that can be read with this record reader. + * This abstract base class is used by both the mutable and immutable record readers. + * + * @param <T> The type of the record that can be read with this record reader. */ -abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements - ReaderBase { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordReader.class); +abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements ReaderBase { private final RecordDeserializer<T>[] recordDeserializers; @@ -53,15 +43,11 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra private boolean isFinished; - private final BarrierBuffer barrierBuffer; - protected AbstractRecordReader(InputGate inputGate) { super(inputGate); - barrierBuffer = new BarrierBuffer(inputGate, this); // Initialize one deserializer per input channel - this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate - .getNumberOfInputChannels()]; + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; for (int i = 0; i < recordDeserializers.length; i++) { recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(); } @@ -86,27 +72,22 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra } } - final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); + final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); if (bufferOrEvent.isBuffer()) { currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); - } else { - // Event received - final AbstractEvent event = bufferOrEvent.getEvent(); - - if (event instanceof StreamingSuperstep) { - barrierBuffer.processSuperstep(bufferOrEvent); - } else { - if (handleEvent(event)) { - if (inputGate.isFinished()) { - isFinished = true; - return false; - } else if (hasReachedEndOfSuperstep()) { - return false; - } // else: More data is coming... - } + } + else if (handleEvent(bufferOrEvent.getEvent())) { + if (inputGate.isFinished()) { + isFinished = true; + + return false; } + else if (hasReachedEndOfSuperstep()) { + + return false; + } // else: More data is coming... } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java new file mode 100644 index 0000000..ea2d7a6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import java.io.IOException; + + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.task.StreamingSuperstep; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A record-oriented reader. + * <p> + * This abstract base class is used by both the mutable and immutable record + * readers. + * + * @param <T> + * The type of the record that can be read with this record reader. + */ +public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements + ReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class); + + private final RecordDeserializer<T>[] recordDeserializers; + + private RecordDeserializer<T> currentRecordDeserializer; + + private boolean isFinished; + + private final BarrierBuffer barrierBuffer; + + protected StreamingAbstractRecordReader(InputGate inputGate) { + super(inputGate); + barrierBuffer = new BarrierBuffer(inputGate, this); + + // Initialize one deserializer per input channel + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate + .getNumberOfInputChannels()]; + for (int i = 0; i < recordDeserializers.length; i++) { + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(); + } + } + + protected boolean getNextRecord(T target) throws IOException, InterruptedException { + if (isFinished) { + return false; + } + + while (true) { + if (currentRecordDeserializer != null) { + DeserializationResult result = currentRecordDeserializer.getNextRecord(target); + + if (result.isBufferConsumed()) { + currentRecordDeserializer.getCurrentBuffer().recycle(); + currentRecordDeserializer = null; + } + + if (result.isFullRecord()) { + return true; + } + } + + final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); + + if (bufferOrEvent.isBuffer()) { + currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()]; + currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); + } else { + // Event received + final AbstractEvent event = bufferOrEvent.getEvent(); + + if (event instanceof StreamingSuperstep) { + barrierBuffer.processSuperstep(bufferOrEvent); + } else { + if (handleEvent(event)) { + if (inputGate.isFinished()) { + isFinished = true; + return false; + } else if (hasReachedEndOfSuperstep()) { + return false; + } // else: More data is coming... + } + } + } + } + } + + public void clearBuffers() { + for (RecordDeserializer<?> deserializer : recordDeserializers) { + Buffer buffer = deserializer.getCurrentBuffer(); + if (buffer != null && !buffer.isRecycled()) { + buffer.recycle(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java index 025393d..3c8824b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java @@ -19,10 +19,10 @@ package org.apache.flink.streaming.io; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -public class IndexedMutableReader<T extends IOReadableWritable> extends MutableRecordReader<T> { +public class IndexedMutableReader<T extends IOReadableWritable> extends + StreamingMutableRecordReader<T> { InputGate reader; http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java new file mode 100644 index 0000000..ffa436b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.io; + +import java.io.IOException; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.io.network.api.reader.StreamingAbstractRecordReader; +import org.apache.flink.runtime.io.network.api.reader.MutableReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; + +public class StreamingMutableRecordReader<T extends IOReadableWritable> extends + StreamingAbstractRecordReader<T> implements MutableReader<T> { + + public StreamingMutableRecordReader(InputGate inputGate) { + super(inputGate); + } + + @Override + public boolean next(final T target) throws IOException, InterruptedException { + return getNextRecord(target); + } + + @Override + public void clearBuffers() { + super.clearBuffers(); + } +}