[FLINK-2754] Fixed FixedLengthRecordSorter write to multi memory pages issue 
and add more unit tests.

This closes #1178


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68912126
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68912126
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68912126

Branch: refs/heads/master
Commit: 68912126d73b92a07d15ec3f21f9ac922744fb45
Parents: e727355
Author: chengxiang li <[email protected]>
Authored: Thu Sep 24 11:20:10 2015 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Tue Sep 29 12:18:49 2015 +0200

----------------------------------------------------------------------
 .../operators/sort/FixedLengthRecordSorter.java |   4 +-
 .../sort/FixedLengthRecordSorterTest.java       | 109 +++++++++++++++++++
 2 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68912126/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index da96b17..3a44ab5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -447,11 +447,13 @@ public final class FixedLengthRecordSorter<T> implements 
InMemorySorter<T> {
                                num -= recordsPerSegment;
                        } else {
                                // partially filled segment
-                               for (; num > 0; num--) {
+                               for (; num > 0 && offset <= 
this.lastEntryOffset; num--, offset += this.recordSize) {
                                        record = 
comparator.readWithKeyDenormalization(record, inView);
                                        serializer.serialize(record, output);
                                }
                        }
+
+                       offset = 0;
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/68912126/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index 517bec3..288e86d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -25,6 +25,14 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
@@ -48,6 +56,8 @@ public class FixedLengthRecordSorterTest {
        private static final int MEMORY_PAGE_SIZE = 32 * 1024; 
 
        private MemoryManager memoryManager;
+
+       private IOManager ioManager;
        
        private TypeSerializer<IntPair> serializer;
        
@@ -57,6 +67,7 @@ public class FixedLengthRecordSorterTest {
        @Before
        public void beforeTest() {
                this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, 
MEMORY_PAGE_SIZE, MemoryType.HEAP, true);
+               this.ioManager = new IOManagerAsync();
                this.serializer = new IntPairSerializer();
                this.comparator = new IntPairComparator();
        }
@@ -368,4 +379,102 @@ public class FixedLengthRecordSorterTest {
                sorter.dispose();
                this.memoryManager.release(memory);
        }
+
+       @Test
+       public void testFlushFullMemoryPage() throws Exception {
+               // Insert IntPair which would fill 2 memory pages.
+               final int NUM_RECORDS = 2 * MEMORY_PAGE_SIZE / 8;
+               final List<MemorySegment> memory = 
this.memoryManager.allocatePages(new DummyInvokable(), 3);
+
+               FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
+               UniformIntPairGenerator generator = new 
UniformIntPairGenerator(Integer.MAX_VALUE, 1, false);
+
+               // write the records
+               IntPair record = new IntPair();
+               int num = -1;
+               do {
+                       generator.next(record);
+                       num++;
+               }
+               while (sorter.write(record) && num < NUM_RECORDS);
+
+               FileIOChannel.ID channelID = 
this.ioManager.createChannelEnumerator().next();
+               BlockChannelWriter<MemorySegment> blockChannelWriter = 
this.ioManager.createBlockChannelWriter(channelID);
+               final List<MemorySegment> writeBuffer = 
this.memoryManager.allocatePages(new DummyInvokable(), 3);
+               ChannelWriterOutputView outputView = new 
ChannelWriterOutputView(blockChannelWriter, writeBuffer, 
writeBuffer.get(0).size());
+
+               sorter.writeToOutput(outputView, 0, NUM_RECORDS);
+
+               this.memoryManager.release(outputView.close());
+
+               BlockChannelReader<MemorySegment> blockChannelReader = 
this.ioManager.createBlockChannelReader(channelID);
+               final List<MemorySegment> readBuffer = 
this.memoryManager.allocatePages(new DummyInvokable(), 3);
+               ChannelReaderInputView readerInputView = new 
ChannelReaderInputView(blockChannelReader, readBuffer, false);
+               final List<MemorySegment> dataBuffer = 
this.memoryManager.allocatePages(new DummyInvokable(), 3);
+               ChannelReaderInputViewIterator<IntPair> iterator = new 
ChannelReaderInputViewIterator(readerInputView, dataBuffer, this.serializer);
+
+               record = iterator.next(record);
+               int i =0;
+               while (record != null) {
+                       Assert.assertEquals(i, record.getKey());
+                       record = iterator.next(record);
+                       i++;
+               }
+
+               Assert.assertEquals(NUM_RECORDS, i);
+
+               this.memoryManager.release(dataBuffer);
+               // release the memory occupied by the buffers
+               sorter.dispose();
+               this.memoryManager.release(memory);
+       }
+
+       @Test
+       public void testFlushPartialMemoryPage() throws Exception {
+               // Insert IntPair which would fill 2 memory pages.
+               final int NUM_RECORDS = 2 * MEMORY_PAGE_SIZE / 8;
+               final List<MemorySegment> memory = 
this.memoryManager.allocatePages(new DummyInvokable(), 3);
+
+               FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
+               UniformIntPairGenerator generator = new 
UniformIntPairGenerator(Integer.MAX_VALUE, 1, false);
+
+               // write the records
+               IntPair record = new IntPair();
+               int num = -1;
+               do {
+                       generator.next(record);
+                       num++;
+               }
+               while (sorter.write(record) && num < NUM_RECORDS);
+
+               FileIOChannel.ID channelID = 
this.ioManager.createChannelEnumerator().next();
+               BlockChannelWriter<MemorySegment> blockChannelWriter = 
this.ioManager.createBlockChannelWriter(channelID);
+               final List<MemorySegment> writeBuffer = 
this.memoryManager.allocatePages(new DummyInvokable(), 3);
+               ChannelWriterOutputView outputView = new 
ChannelWriterOutputView(blockChannelWriter, writeBuffer, 
writeBuffer.get(0).size());
+
+               sorter.writeToOutput(outputView, 1, NUM_RECORDS - 1);
+
+               this.memoryManager.release(outputView.close());
+
+               BlockChannelReader<MemorySegment> blockChannelReader = 
this.ioManager.createBlockChannelReader(channelID);
+               final List<MemorySegment> readBuffer = 
this.memoryManager.allocatePages(new DummyInvokable(), 3);
+               ChannelReaderInputView readerInputView = new 
ChannelReaderInputView(blockChannelReader, readBuffer, false);
+               final List<MemorySegment> dataBuffer = 
this.memoryManager.allocatePages(new DummyInvokable(), 3);
+               ChannelReaderInputViewIterator<IntPair> iterator = new 
ChannelReaderInputViewIterator(readerInputView, dataBuffer, this.serializer);
+
+               record = iterator.next(record);
+               int i =1;
+               while (record != null) {
+                       Assert.assertEquals(i, record.getKey());
+                       record = iterator.next(record);
+                       i++;
+               }
+
+               Assert.assertEquals(NUM_RECORDS, i);
+
+               this.memoryManager.release(dataBuffer);
+               // release the memory occupied by the buffers
+               sorter.dispose();
+               this.memoryManager.release(memory);
+       }
 }

Reply via email to