[FLINK-1638] [streaming] Seperated AbstractRecordReader for streaming and batch


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9a39926
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9a39926
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9a39926

Branch: refs/heads/master
Commit: c9a399268309768738e65af3c52525560b85cd0c
Parents: cf49ebb
Author: Gyula Fora <gyf...@apache.org>
Authored: Thu Mar 5 19:55:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../api/reader/AbstractRecordReader.java        |  55 +++------
 .../reader/StreamingAbstractRecordReader.java   | 122 +++++++++++++++++++
 .../streaming/io/IndexedMutableReader.java      |   4 +-
 .../io/StreamingMutableRecordReader.java        |  44 +++++++
 4 files changed, 186 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 920792c..e70b6ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -18,34 +18,24 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import java.io.IOException;
-
-
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 /**
  * A record-oriented reader.
  * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- * 
- * @param <T>
- *            The type of the record that can be read with this record reader.
+ * This abstract base class is used by both the mutable and immutable record 
readers.
+ *
+ * @param <T> The type of the record that can be read with this record reader.
  */
-abstract class AbstractRecordReader<T extends IOReadableWritable> extends 
AbstractReader implements
-               ReaderBase {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRecordReader.class);
+abstract class AbstractRecordReader<T extends IOReadableWritable> extends 
AbstractReader implements ReaderBase {
 
        private final RecordDeserializer<T>[] recordDeserializers;
 
@@ -53,15 +43,11 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
 
        private boolean isFinished;
 
-       private final BarrierBuffer barrierBuffer;
-
        protected AbstractRecordReader(InputGate inputGate) {
                super(inputGate);
-               barrierBuffer = new BarrierBuffer(inputGate, this);
 
                // Initialize one deserializer per input channel
-               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate
-                               .getNumberOfInputChannels()];
+               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
                for (int i = 0; i < recordDeserializers.length; i++) {
                        recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>();
                }
@@ -86,27 +72,22 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
                                }
                        }
 
-                       final BufferOrEvent bufferOrEvent = 
barrierBuffer.getNextNonBlocked();
+                       final BufferOrEvent bufferOrEvent = 
inputGate.getNextBufferOrEvent();
 
                        if (bufferOrEvent.isBuffer()) {
                                currentRecordDeserializer = 
recordDeserializers[bufferOrEvent.getChannelIndex()];
                                
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-                       } else {
-                               // Event received
-                               final AbstractEvent event = 
bufferOrEvent.getEvent();
-
-                               if (event instanceof StreamingSuperstep) {
-                                       
barrierBuffer.processSuperstep(bufferOrEvent);
-                               } else {
-                                       if (handleEvent(event)) {
-                                               if (inputGate.isFinished()) {
-                                                       isFinished = true;
-                                                       return false;
-                                               } else if 
(hasReachedEndOfSuperstep()) {
-                                                       return false;
-                                               } // else: More data is 
coming...
-                                       }
+                       }
+                       else if (handleEvent(bufferOrEvent.getEvent())) {
+                               if (inputGate.isFinished()) {
+                                       isFinished = true;
+
+                                       return false;
                                }
+                               else if (hasReachedEndOfSuperstep()) {
+
+                                       return false;
+                               } // else: More data is coming...
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
new file mode 100644
index 0000000..ea2d7a6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A record-oriented reader.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ * 
+ * @param <T>
+ *            The type of the record that can be read with this record reader.
+ */
+public abstract class StreamingAbstractRecordReader<T extends 
IOReadableWritable> extends AbstractReader implements
+               ReaderBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
+
+       private final RecordDeserializer<T>[] recordDeserializers;
+
+       private RecordDeserializer<T> currentRecordDeserializer;
+
+       private boolean isFinished;
+
+       private final BarrierBuffer barrierBuffer;
+
+       protected StreamingAbstractRecordReader(InputGate inputGate) {
+               super(inputGate);
+               barrierBuffer = new BarrierBuffer(inputGate, this);
+
+               // Initialize one deserializer per input channel
+               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate
+                               .getNumberOfInputChannels()];
+               for (int i = 0; i < recordDeserializers.length; i++) {
+                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>();
+               }
+       }
+
+       protected boolean getNextRecord(T target) throws IOException, 
InterruptedException {
+               if (isFinished) {
+                       return false;
+               }
+
+               while (true) {
+                       if (currentRecordDeserializer != null) {
+                               DeserializationResult result = 
currentRecordDeserializer.getNextRecord(target);
+
+                               if (result.isBufferConsumed()) {
+                                       
currentRecordDeserializer.getCurrentBuffer().recycle();
+                                       currentRecordDeserializer = null;
+                               }
+
+                               if (result.isFullRecord()) {
+                                       return true;
+                               }
+                       }
+
+                       final BufferOrEvent bufferOrEvent = 
barrierBuffer.getNextNonBlocked();
+
+                       if (bufferOrEvent.isBuffer()) {
+                               currentRecordDeserializer = 
recordDeserializers[bufferOrEvent.getChannelIndex()];
+                               
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+                       } else {
+                               // Event received
+                               final AbstractEvent event = 
bufferOrEvent.getEvent();
+
+                               if (event instanceof StreamingSuperstep) {
+                                       
barrierBuffer.processSuperstep(bufferOrEvent);
+                               } else {
+                                       if (handleEvent(event)) {
+                                               if (inputGate.isFinished()) {
+                                                       isFinished = true;
+                                                       return false;
+                                               } else if 
(hasReachedEndOfSuperstep()) {
+                                                       return false;
+                                               } // else: More data is 
coming...
+                                       }
+                               }
+                       }
+               }
+       }
+
+       public void clearBuffers() {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       Buffer buffer = deserializer.getCurrentBuffer();
+                       if (buffer != null && !buffer.isRecycled()) {
+                               buffer.recycle();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
index 025393d..3c8824b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
@@ -19,10 +19,10 @@
 package org.apache.flink.streaming.io;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-public class IndexedMutableReader<T extends IOReadableWritable> extends 
MutableRecordReader<T> {
+public class IndexedMutableReader<T extends IOReadableWritable> extends
+               StreamingMutableRecordReader<T> {
 
        InputGate reader;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
new file mode 100644
index 0000000..ffa436b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import 
org.apache.flink.runtime.io.network.api.reader.StreamingAbstractRecordReader;
+import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
+               StreamingAbstractRecordReader<T> implements MutableReader<T> {
+
+       public StreamingMutableRecordReader(InputGate inputGate) {
+               super(inputGate);
+       }
+
+       @Override
+       public boolean next(final T target) throws IOException, 
InterruptedException {
+               return getNextRecord(target);
+       }
+
+       @Override
+       public void clearBuffers() {
+               super.clearBuffers();
+       }
+}

Reply via email to