This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ed490f  [FLINK-11863][table-runtime-blink] Introduce channel to read 
and write compressed data (#7944)
3ed490f is described below

commit 3ed490f0f4e561f1ef8b3660d4370030db6115dc
Author: Kurt Young <ykt...@gmail.com>
AuthorDate: Sat Mar 9 13:34:56 2019 +0800

    [FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data (#7944)
---
 .../iomanager/AbstractChannelReaderInputView.java  |  52 +++++
 .../iomanager/AbstractChannelWriterOutputView.java |  63 ++++++
 .../io/disk/iomanager/ChannelReaderInputView.java  | 106 +++++-----
 .../HeaderlessChannelReaderInputView.java          |  59 ++++--
 .../runtime/memory/AbstractPagedInputView.java     |   6 +-
 .../flink/table/runtime/io/ChannelWithMeta.java    |  49 +++++
 .../runtime/io/CompressedBlockChannelReader.java   | 222 +++++++++++++++++++++
 .../runtime/io/CompressedBlockChannelWriter.java   | 187 +++++++++++++++++
 ...CompressedHeaderlessChannelReaderInputView.java | 162 +++++++++++++++
 ...ompressedHeaderlessChannelWriterOutputView.java | 134 +++++++++++++
 .../io/HeaderlessChannelWriterOutputView.java      | 124 ++++++++++++
 .../flink/table/runtime/util/FileChannelUtil.java  | 155 ++++++++++++++
 .../io/CompressedHeaderlessChannelTest.java        |  94 +++++++++
 13 files changed, 1352 insertions(+), 61 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
new file mode 100644
index 0000000..b0fb22a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a
+ * {@link FileIOChannel}, making it effectively a data input stream. The view 
reads it data
+ * in blocks from the underlying channel. The view can only read data that
+ * has been written by a {@link ChannelWriterOutputView}, due to block 
formatting.
+ */
+public abstract class AbstractChannelReaderInputView extends 
AbstractPagedInputView {
+
+       public AbstractChannelReaderInputView(int headerLength) {
+               super(headerLength);
+       }
+
+       /**
+        * Closes this InputView, closing the underlying reader and returning 
all memory segments.
+        *
+        * @return A list containing all memory segments originally supplied to 
this view.
+        * @throws IOException Thrown, if the underlying reader could not be 
properly closed.
+        */
+       public abstract List<MemorySegment> close() throws IOException;
+
+       /**
+        * Get the underlying channel.
+        */
+       public abstract FileIOChannel getChannel();
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
new file mode 100644
index 0000000..ff2c858
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+
+import java.io.IOException;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a 
{@link FileIOChannel},
+ * making it effectively a data output stream. The view writes it data in 
blocks to the underlying
+ * channel.
+ */
+public abstract class AbstractChannelWriterOutputView extends 
AbstractPagedOutputView {
+
+       public AbstractChannelWriterOutputView(int segmentSize, int 
headerLength) {
+               super(segmentSize, headerLength);
+       }
+
+       /**
+        * Get the underlying channel.
+        */
+       public abstract FileIOChannel getChannel();
+
+       /**
+        * Closes this OutputView, closing the underlying writer
+        *
+        * @return the number of bytes in last memory segment.
+        */
+       public abstract int close() throws IOException;
+
+       /**
+        * Gets the number of blocks used by this view.
+        */
+       public abstract int getBlockCount();
+
+       /**
+        * Get output bytes.
+        */
+       public abstract long getNumBytes() throws IOException;
+
+       /**
+        * Get output compressed bytes, return num bytes if there is no 
compression.
+        */
+       public abstract long getNumCompressedBytes() throws IOException;
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index de1ed87..b0dbc16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -33,49 +33,50 @@ import 
org.apache.flink.runtime.memory.AbstractPagedInputView;
  * stream. The view reads it data in blocks from the underlying channel. The 
view can only read data that
  * has been written by a {@link ChannelWriterOutputView}, due to block 
formatting.
  */
-public class ChannelReaderInputView extends AbstractPagedInputView {
-       
+public class ChannelReaderInputView extends AbstractChannelReaderInputView {
+
        protected final BlockChannelReader<MemorySegment> reader;               
// the block reader that reads memory segments
-       
+
        protected int numRequestsRemaining;                             // the 
number of block requests remaining
-       
+
        private final int numSegments;                                  // the 
number of memory segment the view works with
-       
+
        private final ArrayList<MemorySegment> freeMem; // memory gathered once 
the work is done
-       
+
        private boolean inLastBlock;                                    // flag 
indicating whether the view is already in the last block
-       
+
        private boolean closed;                                                 
// flag indicating whether the reader is closed
-       
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
         * Creates a new channel reader that reads from the given channel until 
the last block
         * (as marked by a {@link ChannelWriterOutputView}) is found.
-        * 
+        *
         * @param reader The reader that reads the data from disk back into 
memory.
         * @param memory A list of memory segments that the reader uses for 
reading the data in. If the
         *               list contains more than one segment, the reader will 
asynchronously pre-fetch
         *               blocks ahead.
         * @param waitForFirstBlock A flag indicating weather this constructor 
call should block
         *                          until the first block has returned from the 
asynchronous I/O reader.
-        * 
+        *
         * @throws IOException Thrown, if the read requests for the first 
blocks fail to be
         *                     served by the reader.
         */
-       public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, 
List<MemorySegment> memory, boolean waitForFirstBlock)
-       throws IOException
-       {
+       public ChannelReaderInputView(
+                       BlockChannelReader<MemorySegment> reader,
+                       List<MemorySegment> memory,
+                       boolean waitForFirstBlock) throws IOException {
                this(reader, memory, -1, waitForFirstBlock);
        }
-       
+
        /**
         * Creates a new channel reader that reads from the given channel, 
expecting a specified
         * number of blocks in the channel.
         * <p>
         * WARNING: The reader will lock if the number of blocks given here is 
actually lower than
         * the actual number of blocks in the channel.
-        * 
+        *
         * @param reader The reader that reads the data from disk back into 
memory.
         * @param memory A list of memory segments that the reader uses for 
reading the data in. If the
         *               list contains more than one segment, the reader will 
asynchronously pre-fetch
@@ -85,23 +86,24 @@ public class ChannelReaderInputView extends 
AbstractPagedInputView {
         *                  beyond the channel size.
         * @param waitForFirstBlock A flag indicating weather this constructor 
call should block
         *                          until the first block has returned from the 
asynchronous I/O reader.
-        * 
+        *
         * @throws IOException Thrown, if the read requests for the first 
blocks fail to be
         *                     served by the reader.
         */
-       public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, 
List<MemorySegment> memory,
-                                                                               
                                int numBlocks, boolean waitForFirstBlock)
-       throws IOException
-       {
+       public ChannelReaderInputView(
+                       BlockChannelReader<MemorySegment> reader,
+                       List<MemorySegment> memory,
+                       int numBlocks,
+                       boolean waitForFirstBlock) throws IOException {
                this(reader, memory, numBlocks, 
ChannelWriterOutputView.HEADER_LENGTH, waitForFirstBlock);
        }
-               
+
        /**
         * Non public constructor to allow subclasses to use this input view 
with different headers.
         * <p>
         * WARNING: The reader will lock if the number of blocks given here is 
actually lower than
         * the actual number of blocks in the channel.
-        * 
+        *
         * @param reader The reader that reads the data from disk back into 
memory.
         * @param memory A list of memory segments that the reader uses for 
reading the data in. If the
         *               list contains more than one segment, the reader will 
asynchronously pre-fetch
@@ -114,15 +116,17 @@ public class ChannelReaderInputView extends 
AbstractPagedInputView {
         *                  so any subclass changing the header length should 
override that methods as well.
         * @param waitForFirstBlock A flag indicating weather this constructor 
call should block
         *                          until the first block has returned from the 
asynchronous I/O reader.
-        * 
+        *
         * @throws IOException
         */
-       ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, 
List<MemorySegment> memory,
-                               int numBlocks, int headerLen, boolean 
waitForFirstBlock)
-       throws IOException
-       {
+       ChannelReaderInputView(
+                       BlockChannelReader<MemorySegment> reader,
+                       List<MemorySegment> memory,
+                       int numBlocks,
+                       int headerLen,
+                       boolean waitForFirstBlock) throws IOException {
                super(headerLen);
-               
+
                if (reader == null || memory == null) {
                        throw new NullPointerException();
                }
@@ -132,44 +136,45 @@ public class ChannelReaderInputView extends 
AbstractPagedInputView {
                if (numBlocks < 1 && numBlocks != -1) {
                        throw new IllegalArgumentException("The number of 
blocks must be a positive number, or -1, if unknown.");
                }
-               
+
                this.reader = reader;
                this.numRequestsRemaining = numBlocks;
                this.numSegments = memory.size();
                this.freeMem = new ArrayList<MemorySegment>(this.numSegments);
-               
+
                for (int i = 0; i < memory.size(); i++) {
                        sendReadRequest(memory.get(i));
                }
-               
+
                if (waitForFirstBlock) {
                        advance();
                }
        }
-       
+
        public void waitForFirstBlock() throws IOException
        {
                if (getCurrentSegment() == null) {
                        advance();
                }
        }
-       
+
        public boolean isClosed() {
                return this.closed;
        }
-       
+
        /**
         * Closes this InputView, closing the underlying reader and returning 
all memory segments.
-        * 
+        *
         * @return A list containing all memory segments originally supplied to 
this view.
         * @throws IOException Thrown, if the underlying reader could not be 
properly closed.
         */
-       public List<MemorySegment> close() throws IOException { 
+       @Override
+       public List<MemorySegment> close() throws IOException {
                if (this.closed) {
                        throw new IllegalStateException("Already closed.");
                }
                this.closed = true;
-               
+
                // re-collect all memory segments
                ArrayList<MemorySegment> list = this.freeMem;
                final MemorySegment current = getCurrentSegment();
@@ -192,7 +197,12 @@ public class ChannelReaderInputView extends 
AbstractPagedInputView {
                }
                return list;
        }
-       
+
+       @Override
+       public FileIOChannel getChannel() {
+               return reader;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //                                        Utilities
        // 
--------------------------------------------------------------------------------------------
@@ -203,12 +213,12 @@ public class ChannelReaderInputView extends 
AbstractPagedInputView {
         * adds the segment to the readers return queue, which thereby 
effectively collects all memory segments.
         * Secondly, the method fetches the next non-consumed segment
         * returned by the reader. If no further segments are available, this 
method thrown an {@link EOFException}.
-        * 
+        *
         * @param current The memory segment used for the next request.
         * @return The memory segment to read from next.
-        * 
+        *
         * @throws EOFException Thrown, if no further segments are available.
-        * @throws IOException Thrown, if an I/O error occurred while reading 
+        * @throws IOException Thrown, if an I/O error occurred while reading
         * @see 
AbstractPagedInputView#nextSegment(org.apache.flink.core.memory.MemorySegment)
         */
        @Override
@@ -217,16 +227,16 @@ public class ChannelReaderInputView extends 
AbstractPagedInputView {
                if (this.inLastBlock) {
                        throw new EOFException();
                }
-                               
+
                // send a request first. if we have only a single segment, this 
same segment will be the one obtained in
                // the next lines
                if (current != null) {
                        sendReadRequest(current);
                }
-               
+
                // get the next segment
                final MemorySegment seg = this.reader.getNextReturnedBlock();
-               
+
                // check the header
                if (seg.getShort(0) != 
ChannelWriterOutputView.HEADER_MAGIC_NUMBER) {
                        throw new IOException("The current block does not 
belong to a ChannelWriterOutputView / " +
@@ -237,20 +247,20 @@ public class ChannelReaderInputView extends 
AbstractPagedInputView {
                        this.numRequestsRemaining = 0;
                        this.inLastBlock = true;
                }
-               
+
                return seg;
        }
-       
+
 
        @Override
        protected int getLimitForSegment(MemorySegment segment) {
                return 
segment.getInt(ChannelWriterOutputView.HEAD_BLOCK_LENGTH_OFFSET);
        }
-       
+
        /**
         * Sends a new read requests, if further requests remain. Otherwise, 
this method adds the segment
         * directly to the readers return queue.
-        * 
+        *
         * @param seg The segment to use for the read request.
         * @throws IOException Thrown, if the reader is in error.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
index 63e86c9..9d45205 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.flink.core.memory.MemorySegment;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a
  * {@link BlockChannelReader}, making it effectively a data input
@@ -31,14 +33,17 @@ import org.apache.flink.core.memory.MemorySegment;
  * a header for each block, giving a direct stream abstraction over sequence 
of written
  * blocks. It therefore requires specification of the number of blocks and the 
number of
  * bytes in the last block.
- *
  */
-public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
-{
+public class HeaderlessChannelReaderInputView extends ChannelReaderInputView {
+
        private int numBlocksRemaining;         // the number of blocks not yet 
consumed
        
        private final int lastBlockBytes;       // the number of valid bytes in 
the last block
 
+       private long offset;                            // offset to seek after 
reading the first block
+
+       private boolean isFirstBlock;           // if current block is the 
first block
+
        /**
         * Creates a new channel reader that reads from the given channel, 
expecting a specified
         * number of blocks in the channel, and returns only a specified number 
of bytes from
@@ -60,16 +65,36 @@ public class HeaderlessChannelReaderInputView extends 
ChannelReaderInputView
         * @throws IOException Thrown, if the read requests for the first 
blocks fail to be
         *                     served by the reader.
         */
-       public 
HeaderlessChannelReaderInputView(BlockChannelReader<MemorySegment> reader, 
List<MemorySegment> memory, int numBlocks,
-                       int numBytesInLastBlock, boolean waitForFirstBlock)
-       throws IOException
-       {
-               super(reader, memory, numBlocks, 0, waitForFirstBlock);
-               
+       public HeaderlessChannelReaderInputView(
+                       BlockChannelReader<MemorySegment> reader,
+                       List<MemorySegment> memory,
+                       int numBlocks,
+                       int numBytesInLastBlock,
+                       boolean waitForFirstBlock) throws IOException {
+               this(reader, memory, numBlocks, numBytesInLastBlock, 
waitForFirstBlock, 0);
+       }
+
+       public HeaderlessChannelReaderInputView(
+                       BlockChannelReader<MemorySegment> reader,
+                       List<MemorySegment> memory,
+                       int numBlocks,
+                       int numBytesInLastBlock,
+                       boolean waitForFirstBlock,
+                       long offset) throws IOException {
+               // postpone wait for first block after initializing offset
+               // otherwise the offset is not set, and we can't seek input
+               super(reader, memory, numBlocks, 0, false);
+
                this.numBlocksRemaining = numBlocks;
                this.lastBlockBytes = numBytesInLastBlock;
+
+               checkArgument(offset >= 0, "`offset` can't be negative!");
+               this.offset = offset;
+
+               if (waitForFirstBlock) {
+                       advance();
+               }
        }
-       
 
        @Override
        protected MemorySegment nextSegment(MemorySegment current) throws 
IOException {
@@ -84,12 +109,22 @@ public class HeaderlessChannelReaderInputView extends 
ChannelReaderInputView
                if (current != null) {
                        sendReadRequest(current);
                }
-               
+
+               // check if next segment is the first block
+               isFirstBlock = (current == null);
+
                // get the next segment
                this.numBlocksRemaining--;
                return this.reader.getNextReturnedBlock();
        }
-       
+
+       @Override
+       protected void advance() throws IOException {
+               doAdvance();
+               if (isFirstBlock && offset > 0) {
+                       seekInput(getCurrentSegment(), (int) offset, 
getCurrentSegmentLimit());
+               }
+       }
 
        @Override
        protected int getLimitForSegment(MemorySegment segment) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
index 0536e1a..d73aeaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
@@ -152,7 +152,11 @@ public abstract class AbstractPagedInputView implements 
DataInputView {
         * @see #nextSegment(MemorySegment)
         * @see #getLimitForSegment(MemorySegment)
         */
-       protected final void advance() throws IOException {
+       protected void advance() throws IOException {
+               doAdvance();
+       }
+
+       protected void doAdvance() throws IOException {
                // note: this code ensures that in case of EOF, we stay at the 
same position such that
                // EOF is reproducible (if nextSegment throws a reproducible 
EOFException)
                this.currentSegment = nextSegment(this.currentSegment);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/ChannelWithMeta.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/ChannelWithMeta.java
new file mode 100644
index 0000000..15b82db
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/ChannelWithMeta.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+
+/**
+ * Channel with block count and numBytesInLastBlock of file.
+ */
+public class ChannelWithMeta {
+
+       private final FileIOChannel.ID channel;
+       private final int blockCount;
+       private final int numBytesInLastBlock;
+
+       public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, int 
numBytesInLastBlock) {
+               this.channel = channel;
+               this.blockCount = blockCount;
+               this.numBytesInLastBlock = numBytesInLastBlock;
+       }
+
+       public FileIOChannel.ID getChannel() {
+               return channel;
+       }
+
+       public int getBlockCount() {
+               return blockCount;
+       }
+
+       public int getNumBytesInLastBlock() {
+               return numBytesInLastBlock;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
new file mode 100644
index 0000000..6be2c2e
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.table.runtime.compression.BlockDecompressor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Compressed block channel reader provides a scenario where MemorySegment 
must be maintained.
+ */
+public class CompressedBlockChannelReader
+               implements BlockChannelReader<MemorySegment>, 
RequestDoneCallback<Buffer>, BufferRecycler {
+
+       private final LinkedBlockingQueue<MemorySegment> blockQueue;
+       private final boolean copyCompress;
+       private final BlockDecompressor decompressor;
+       private final BufferFileReader reader;
+       private final AtomicReference<IOException> cause;
+       private final LinkedBlockingQueue<Buffer> retBuffers = new 
LinkedBlockingQueue<>();
+
+       private byte[] buf;
+       private ByteBuffer bufWrapper;
+       private int offset;
+       private int len;
+
+       public CompressedBlockChannelReader(
+                       IOManager ioManager,
+                       ID channel,
+                       LinkedBlockingQueue<MemorySegment> blockQueue,
+                       BlockCompressionFactory codecFactory,
+                       int preferBlockSize,
+                       int segmentSize) throws IOException {
+               this.reader = ioManager.createBufferFileReader(channel, this);
+               this.blockQueue = blockQueue;
+               copyCompress = preferBlockSize > segmentSize * 2;
+               int blockSize = copyCompress ? preferBlockSize : segmentSize;
+               this.decompressor = codecFactory.getDecompressor();
+               cause = new AtomicReference<>();
+
+               if (copyCompress) {
+                       this.buf = new byte[blockSize];
+                       this.bufWrapper = ByteBuffer.wrap(buf);
+               }
+
+               BlockCompressor compressor = codecFactory.getCompressor();
+               for (int i = 0; i < 2; i++) {
+                       MemorySegment segment = MemorySegmentFactory.wrap(new 
byte[compressor.getMaxCompressedSize(blockSize)]);
+                       reader.readInto(new NetworkBuffer(segment, this));
+               }
+       }
+
+       @Override
+       public void readBlock(MemorySegment segment) throws IOException {
+               if (cause.get() != null) {
+                       throw cause.get();
+               }
+
+               if (copyCompress) {
+                       int readOffset = 0;
+                       int readLen = segment.size();
+
+                       while (readLen > 0) {
+                               int copy = Math.min(readLen, len - offset);
+                               if (copy == 0) {
+                                       readBuffer();
+                               } else {
+                                       segment.put(readOffset, buf, offset, 
copy);
+                                       offset += copy;
+                                       readOffset += copy;
+                                       readLen -= copy;
+                               }
+                       }
+               } else {
+                       int len = decompressBuffer(segment.wrap(0, 
segment.size()));
+                       Preconditions.checkState(len == segment.size());
+               }
+
+               boolean add = blockQueue.add(segment);
+               Preconditions.checkState(add); // LinkedBlockingQueue never add 
fail.
+       }
+
+       private void readBuffer() throws IOException {
+               len = decompressBuffer(bufWrapper);
+       }
+
+       private int decompressBuffer(ByteBuffer toRead) throws IOException {
+               try {
+                       Buffer buffer;
+                       while ((buffer = retBuffers.poll(11000, 
TimeUnit.MILLISECONDS)) == null) {
+                               if (cause.get() != null) {
+                                       throw cause.get();
+                               }
+                       }
+
+                       int readLen = decompressor.decompress(
+                                       buffer.getMemorySegment().wrap(0, 
buffer.getSize()), 0, buffer.getSize(),
+                                       toRead, 0);
+
+                       buffer.recycleBuffer();
+                       return readLen;
+               } catch (InterruptedException e) {
+                       throw new IOException(e);
+               }
+       }
+
+       @Override
+       public void seekToPosition(long position) throws IOException {
+               throw new RuntimeException("Not support yet!");
+       }
+
+       @Override
+       public MemorySegment getNextReturnedBlock() throws IOException {
+               try {
+                       while (true) {
+                               final MemorySegment next = 
blockQueue.poll(1000, TimeUnit.MILLISECONDS);
+                               if (next != null) {
+                                       return next;
+                               } else {
+                                       if (reader.isClosed()) {
+                                               throw new IOException("The 
writer has been closed.");
+                                       }
+                               }
+                       }
+               } catch (InterruptedException e) {
+                       throw new IOException("Writer was interrupted while 
waiting for the next returning segment.");
+               }
+       }
+
+       @Override
+       public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+               return blockQueue;
+       }
+
+       @Override
+       public ID getChannelID() {
+               return reader.getChannelID();
+       }
+
+       @Override
+       public long getSize() throws IOException {
+               return reader.getSize();
+       }
+
+       @Override
+       public boolean isClosed() {
+               return reader.isClosed();
+       }
+
+       @Override
+       public void close() throws IOException {
+               reader.close();
+       }
+
+       @Override
+       public void deleteChannel() {
+               reader.deleteChannel();
+       }
+
+       @Override
+       public void closeAndDelete() throws IOException {
+               reader.closeAndDelete();
+       }
+
+       @Override
+       public FileChannel getNioFileChannel() {
+               return reader.getNioFileChannel();
+       }
+
+       @Override
+       public void requestSuccessful(Buffer request) {
+               retBuffers.add(request);
+       }
+
+       @Override
+       public void requestFailed(Buffer buffer, IOException e) {
+               cause.compareAndSet(null, e);
+               throw new RuntimeException(e);
+       }
+
+       @Override
+       public void recycle(MemorySegment segment) {
+               try {
+                       reader.readInto(new NetworkBuffer(segment, this));
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java
new file mode 100644
index 0000000..76db926
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Compressed block channel writer provides a scenario where MemorySegment 
must be maintained.
+ */
+public class CompressedBlockChannelWriter
+               implements BlockChannelWriter<MemorySegment>, BufferRecycler {
+
+       private final LinkedBlockingQueue<MemorySegment> blockQueue;
+       private final LinkedBlockingQueue<MemorySegment> compressedBuffers = 
new LinkedBlockingQueue<>();
+       private final BufferFileWriter writer;
+       private final boolean copyCompress;
+       private final BlockCompressor compressor;
+
+       private byte[] buf;
+       private ByteBuffer bufWrapper;
+       private int count;
+
+       public CompressedBlockChannelWriter(
+                       IOManager ioManager, ID channel,
+                       LinkedBlockingQueue<MemorySegment> blockQueue,
+                       BlockCompressionFactory codecFactory, int 
preferBlockSize, int segmentSize) throws IOException {
+               this.writer = ioManager.createBufferFileWriter(channel);
+               this.blockQueue = blockQueue;
+               copyCompress = preferBlockSize > segmentSize * 2;
+               int blockSize = copyCompress ? preferBlockSize : segmentSize;
+               this.compressor = codecFactory.getCompressor();
+
+               if (copyCompress) {
+                       this.buf = new byte[blockSize];
+                       this.bufWrapper = ByteBuffer.wrap(buf);
+               }
+
+               for (int i = 0; i < 2; i++) {
+                       compressedBuffers.add(MemorySegmentFactory.wrap(
+                                       new 
byte[compressor.getMaxCompressedSize(blockSize)]));
+               }
+       }
+
+       @Override
+       public void writeBlock(MemorySegment block) throws IOException {
+               if (copyCompress) {
+                       int offset = 0;
+                       int len = block.size();
+
+                       while (len > 0) {
+                               int copy = Math.min(len, buf.length - count);
+                               if (copy == 0) {
+                                       flushBuffer();
+                               } else {
+                                       block.get(offset, buf, count, copy);
+                                       count += copy;
+                                       offset += copy;
+                                       len -= copy;
+                               }
+                       }
+               } else {
+                       compressBuffer(block.wrap(0, block.size()), 
block.size());
+               }
+
+               boolean add = blockQueue.add(block);
+               Preconditions.checkState(add); // LinkedBlockingQueue never add 
fail.
+       }
+
+       private void flushBuffer() throws IOException {
+               compressBuffer(bufWrapper, count);
+               count = 0;
+       }
+
+       private void compressBuffer(ByteBuffer buffer, int len) throws 
IOException {
+               MemorySegment compressedBuffer;
+               try {
+                       compressedBuffer = compressedBuffers.take();
+               } catch (InterruptedException e) {
+                       throw new IOException(e);
+               }
+               int compressedLen = compressor.compress(
+                               buffer, 0, len,
+                               compressedBuffer.wrap(0, 
compressedBuffer.size()), 0);
+               NetworkBuffer networkBuffer = new 
NetworkBuffer(compressedBuffer, this);
+               networkBuffer.setSize(compressedLen);
+               writer.writeBlock(networkBuffer);
+       }
+
+       @Override
+       public ID getChannelID() {
+               return writer.getChannelID();
+       }
+
+       @Override
+       public long getSize() throws IOException {
+               return writer.getSize();
+       }
+
+       @Override
+       public boolean isClosed() {
+               return writer.isClosed();
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (!writer.isClosed()) {
+                       if (copyCompress) {
+                               flushBuffer();
+                       }
+                       this.writer.close();
+               }
+       }
+
+       @Override
+       public void deleteChannel() {
+               writer.deleteChannel();
+       }
+
+       @Override
+       public void closeAndDelete() throws IOException {
+               writer.closeAndDelete();
+       }
+
+       @Override
+       public FileChannel getNioFileChannel() {
+               return writer.getNioFileChannel();
+       }
+
+       @Override
+       public void recycle(MemorySegment memorySegment) {
+               compressedBuffers.add(memorySegment);
+       }
+
+       @Override
+       public MemorySegment getNextReturnedBlock() throws IOException {
+               try {
+                       while (true) {
+                               final MemorySegment next = 
blockQueue.poll(1000, TimeUnit.MILLISECONDS);
+                               if (next != null) {
+                                       return next;
+                               } else {
+                                       if (writer.isClosed()) {
+                                               throw new IOException("The 
writer has been closed.");
+                                       }
+                               }
+                       }
+               } catch (InterruptedException e) {
+                       throw new IOException("Writer was interrupted while 
waiting for the next returning segment.");
+               }
+       }
+
+       @Override
+       public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+               return blockQueue;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java
new file mode 100644
index 0000000..7c879b9
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.table.runtime.compression.BlockDecompressor;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a
+ * {@link BufferFileReader}, making it effectively a data input stream. The 
view reads it data
+ * in blocks from the underlying channel and decompress it before returning to 
caller. The view
+ * can only read data that has been written by {@link 
CompressedHeaderlessChannelWriterOutputView},
+ * due to block formatting.
+ */
+public class CompressedHeaderlessChannelReaderInputView
+               extends AbstractChannelReaderInputView
+               implements RequestDoneCallback<Buffer>, BufferRecycler {
+
+       private final BlockDecompressor decompressor;
+       private final BufferFileReader reader;
+       private final MemorySegment uncompressedBuffer;
+       private final AtomicReference<IOException> cause;
+
+       private final LinkedBlockingQueue<Buffer> retBuffers = new 
LinkedBlockingQueue<>();
+
+       private int numBlocksRemaining;
+       private int currentSegmentLimit;
+
+       public CompressedHeaderlessChannelReaderInputView(
+                       FileIOChannel.ID id,
+                       IOManager ioManager,
+                       BlockCompressionFactory compressionCodecFactory,
+                       int compressionBlockSize,
+                       int numBlocks) throws IOException {
+               super(0);
+               this.numBlocksRemaining = numBlocks;
+               this.reader = ioManager.createBufferFileReader(id, this);
+               uncompressedBuffer = MemorySegmentFactory.wrap(new 
byte[compressionBlockSize]);
+               decompressor = compressionCodecFactory.getDecompressor();
+               cause = new AtomicReference<>();
+
+               BlockCompressor compressor = 
compressionCodecFactory.getCompressor();
+               for (int i = 0; i < 2; i++) {
+                       MemorySegment segment = MemorySegmentFactory.wrap(new 
byte[compressor.getMaxCompressedSize(
+                                       compressionBlockSize)]);
+                       reader.readInto(new NetworkBuffer(segment, this));
+               }
+       }
+
+       @Override
+       protected MemorySegment nextSegment(MemorySegment current) throws 
IOException {
+               if (cause.get() != null) {
+                       throw cause.get();
+               }
+
+               // check for end-of-stream
+               if (this.numBlocksRemaining <= 0) {
+                       this.reader.close();
+                       throw new EOFException();
+               }
+
+               try {
+                       Buffer buffer;
+                       while ((buffer = retBuffers.poll(1, TimeUnit.SECONDS)) 
== null) {
+                               if (cause.get() != null) {
+                                       throw cause.get();
+                               }
+                       }
+                       this.currentSegmentLimit = decompressor.decompress(
+                                       buffer.getMemorySegment().getArray(), 
0, buffer.getSize(),
+                                       uncompressedBuffer.getArray(), 0
+                       );
+
+                       buffer.recycleBuffer();
+                       this.numBlocksRemaining--;
+                       return uncompressedBuffer;
+               }
+               catch (InterruptedException e) {
+                       throw new IOException(e);
+               }
+       }
+
+       public BufferFileReader getReader() {
+               return reader;
+       }
+
+       @Override
+       protected int getLimitForSegment(MemorySegment segment) {
+               return currentSegmentLimit;
+       }
+
+       @Override
+       public List<MemorySegment> close() throws IOException {
+               reader.close();
+               return Collections.emptyList();
+       }
+
+       @Override
+       public FileIOChannel getChannel() {
+               return reader;
+       }
+
+       public boolean isClosed() {
+               return reader.isClosed();
+       }
+
+       @Override
+       public void requestSuccessful(Buffer request) {
+               retBuffers.add(request);
+       }
+
+       @Override
+       public void requestFailed(Buffer buffer, IOException e) {
+               cause.compareAndSet(null, e);
+               throw new RuntimeException(e);
+       }
+
+       @Override
+       public void recycle(MemorySegment segment) {
+               try {
+                       reader.readInto(new NetworkBuffer(segment, this));
+               }
+               catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java
new file mode 100644
index 0000000..a67ef65
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a 
{@link FileIOChannel},
+ * making it effectively a data output stream. The view will compress its data 
before writing it
+ * in blocks to the underlying channel.
+ */
+public final class CompressedHeaderlessChannelWriterOutputView
+               extends AbstractChannelWriterOutputView implements 
BufferRecycler {
+
+       private final MemorySegment buffer;
+       private final LinkedBlockingQueue<MemorySegment> compressedBuffers = 
new LinkedBlockingQueue<>();
+       private final BlockCompressor compressor;
+       private final BufferFileWriter writer;
+       private final int compressionBlockSize;
+
+       private int blockCount;
+
+       private long numBytes;
+       private long numCompressedBytes;
+
+       public CompressedHeaderlessChannelWriterOutputView(
+                       BufferFileWriter writer,
+                       BlockCompressionFactory compressionCodecFactory,
+                       int compressionBlockSize) {
+               super(compressionBlockSize, 0);
+
+               this.compressionBlockSize = compressionBlockSize;
+               buffer = MemorySegmentFactory.wrap(new 
byte[compressionBlockSize]);
+               compressor = compressionCodecFactory.getCompressor();
+               for (int i = 0; i < 2; i++) {
+                       compressedBuffers.add(MemorySegmentFactory.wrap(
+                                       new 
byte[compressor.getMaxCompressedSize(compressionBlockSize)]));
+               }
+               this.writer = writer;
+
+               try {
+                       advance();
+               } catch (IOException ioex) {
+                       throw new RuntimeException(ioex);
+               }
+       }
+
+       @Override
+       public FileIOChannel getChannel() {
+               return writer;
+       }
+
+       @Override
+       public int close() throws IOException {
+               if (!writer.isClosed()) {
+                       int currentPositionInSegment = 
getCurrentPositionInSegment();
+                       writeCompressed(buffer, currentPositionInSegment);
+                       clear();
+                       this.writer.close();
+               }
+               return -1;
+       }
+
+       @Override
+       protected MemorySegment nextSegment(MemorySegment current, int 
positionInCurrent) throws IOException {
+               if (current != null) {
+                       writeCompressed(current, compressionBlockSize);
+               }
+               return buffer;
+       }
+
+       private void writeCompressed(MemorySegment current, int size) throws 
IOException {
+               MemorySegment compressedBuffer;
+               try {
+                       compressedBuffer = compressedBuffers.take();
+               } catch (InterruptedException e) {
+                       throw new IOException(e);
+               }
+               int compressedLen = compressor.compress(current.getArray(), 0, 
size, compressedBuffer.getArray(), 0);
+               NetworkBuffer networkBuffer = new 
NetworkBuffer(compressedBuffer, this);
+               networkBuffer.setSize(compressedLen);
+               writer.writeBlock(networkBuffer);
+               blockCount++;
+               numBytes += size;
+               numCompressedBytes += compressedLen;
+       }
+
+       @Override
+       public long getNumBytes() {
+               return numBytes;
+       }
+
+       @Override
+       public long getNumCompressedBytes() {
+               return numCompressedBytes;
+       }
+
+       @Override
+       public int getBlockCount() {
+               return blockCount;
+       }
+
+       @Override
+       public void recycle(MemorySegment memorySegment) {
+               compressedBuffers.add(memorySegment);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/HeaderlessChannelWriterOutputView.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/HeaderlessChannelWriterOutputView.java
new file mode 100644
index 0000000..d581995
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/HeaderlessChannelWriterOutputView.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a
+ * {@link BlockChannelWriter}, making it effectively a data output stream. The 
view writes its
+ * data in blocks to the underlying channel, but does not expect header for 
each block.
+ */
+public final class HeaderlessChannelWriterOutputView extends 
AbstractChannelWriterOutputView {
+
+       private final BlockChannelWriter<MemorySegment> writer; // the writer 
to the channel
+
+       private int blockCount; // the number of blocks used
+
+       public HeaderlessChannelWriterOutputView(
+                       BlockChannelWriter<MemorySegment> writer,
+                       List<MemorySegment> memory,
+                       int segmentSize) {
+               super(segmentSize, 0);
+
+               if (writer == null) {
+                       throw new NullPointerException();
+               }
+
+               this.writer = writer;
+
+               Preconditions.checkNotNull(memory);
+
+               // load the segments into the queue
+               final LinkedBlockingQueue<MemorySegment> queue = 
writer.getReturnQueue();
+               for (int i = memory.size() - 1; i >= 0; --i) {
+                       final MemorySegment seg = memory.get(i);
+                       if (seg.size() != segmentSize) {
+                               throw new IllegalArgumentException("This 
segment are not of the specified size.");
+                       }
+                       queue.add(seg);
+               }
+
+               // get the first segment
+               try {
+                       advance();
+               } catch (IOException ioex) {
+                       throw new RuntimeException(ioex);
+               }
+       }
+
+       @Override
+       public FileIOChannel getChannel() {
+               return writer;
+       }
+
+       /**
+        * Closes this OutputView, closing the underlying writer. And return 
number bytes in last
+        * memory segment.
+        */
+       @Override
+       public int close() throws IOException {
+               if (!writer.isClosed()) {
+                       int currentPositionInSegment = 
getCurrentPositionInSegment();
+                       // write last segment
+                       writer.writeBlock(getCurrentSegment());
+                       clear();
+
+                       writer.getReturnQueue().clear();
+                       this.writer.close();
+
+                       return currentPositionInSegment;
+               }
+               return -1;
+       }
+
+       @Override
+       public int getBlockCount() {
+               return this.blockCount;
+       }
+
+       @Override
+       public long getNumBytes() throws IOException {
+               return writer.getSize();
+       }
+
+       @Override
+       public long getNumCompressedBytes() throws IOException {
+               return writer.getSize();
+       }
+
+       @Override
+       public MemorySegment nextSegment(MemorySegment current, int 
posInSegment) throws IOException {
+               if (current != null) {
+                       writer.writeBlock(current);
+               }
+
+               final MemorySegment next = this.writer.getNextReturnedBlock();
+               this.blockCount++;
+               return next;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
new file mode 100644
index 0000000..780cbfd
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import 
org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.io.ChannelWithMeta;
+import org.apache.flink.table.runtime.io.CompressedBlockChannelReader;
+import org.apache.flink.table.runtime.io.CompressedBlockChannelWriter;
+import 
org.apache.flink.table.runtime.io.CompressedHeaderlessChannelReaderInputView;
+import 
org.apache.flink.table.runtime.io.CompressedHeaderlessChannelWriterOutputView;
+import org.apache.flink.table.runtime.io.HeaderlessChannelWriterOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment;
+
+/**
+ * File channel util for runtime.
+ */
+public class FileChannelUtil {
+
+       public static AbstractChannelReaderInputView createInputView(
+                       IOManager ioManager,
+                       ChannelWithMeta channel,
+                       List<FileIOChannel> channels,
+                       boolean compressionEnable,
+                       BlockCompressionFactory compressionCodecFactory,
+                       int compressionBlockSize,
+                       int segmentSize) throws IOException {
+               if (compressionEnable) {
+                       CompressedHeaderlessChannelReaderInputView in =
+                                       new 
CompressedHeaderlessChannelReaderInputView(
+                                                       channel.getChannel(),
+                                                       ioManager,
+                                                       compressionCodecFactory,
+                                                       compressionBlockSize,
+                                                       channel.getBlockCount()
+                                       );
+                       channels.add(in.getReader());
+                       return in;
+               } else {
+                       BlockChannelReader<MemorySegment> reader =
+                                       
ioManager.createBlockChannelReader(channel.getChannel());
+                       channels.add(reader);
+                       return new HeaderlessChannelReaderInputView(
+                                       reader,
+                                       Arrays.asList(
+                                                       
allocateUnpooledSegment(segmentSize),
+                                                       
allocateUnpooledSegment(segmentSize)
+                                       ),
+                                       channel.getBlockCount(),
+                                       channel.getNumBytesInLastBlock(), false
+                       );
+               }
+       }
+
+       public static AbstractChannelWriterOutputView createOutputView(
+                       IOManager ioManager,
+                       FileIOChannel.ID channel,
+                       boolean compressionEnable,
+                       BlockCompressionFactory compressionCodecFactory,
+                       int compressionBlockSize,
+                       int segmentSize) throws IOException {
+               if (compressionEnable) {
+                       BufferFileWriter bufferWriter = 
ioManager.createBufferFileWriter(channel);
+                       return new CompressedHeaderlessChannelWriterOutputView(
+                                       bufferWriter,
+                                       compressionCodecFactory,
+                                       compressionBlockSize);
+               } else {
+                       BlockChannelWriter<MemorySegment> blockWriter =
+                                       
ioManager.createBlockChannelWriter(channel);
+                       return new HeaderlessChannelWriterOutputView(
+                                       blockWriter,
+                                       Arrays.asList(
+                                                       
allocateUnpooledSegment(segmentSize),
+                                                       
allocateUnpooledSegment(segmentSize)
+                                       ),
+                                       segmentSize
+                       );
+               }
+       }
+
+       public static BlockChannelWriter<MemorySegment> 
createBlockChannelWriter(
+                       IOManager ioManager,
+                       FileIOChannel.ID channel,
+                       LinkedBlockingQueue<MemorySegment> bufferReturnQueue,
+                       boolean compressionEnable,
+                       BlockCompressionFactory compressionCodecFactory,
+                       int compressionBlockSize,
+                       int segmentSize) throws IOException {
+               if (compressionEnable) {
+                       return new CompressedBlockChannelWriter(
+                                       ioManager,
+                                       channel,
+                                       bufferReturnQueue,
+                                       compressionCodecFactory,
+                                       compressionBlockSize,
+                                       segmentSize
+                       );
+               } else {
+                       return ioManager.createBlockChannelWriter(channel, 
bufferReturnQueue);
+               }
+       }
+
+       public static BlockChannelReader<MemorySegment> 
createBlockChannelReader(
+                       IOManager ioManager,
+                       FileIOChannel.ID channel,
+                       LinkedBlockingQueue<MemorySegment> bufferReturnQueue,
+                       boolean compressionEnable,
+                       BlockCompressionFactory compressionCodecFactory,
+                       int compressionBlockSize,
+                       int segmentSize) throws IOException {
+               if (compressionEnable) {
+                       return new CompressedBlockChannelReader(
+                                       ioManager,
+                                       channel,
+                                       bufferReturnQueue,
+                                       compressionCodecFactory,
+                                       compressionBlockSize,
+                                       segmentSize
+                       );
+               } else {
+                       return ioManager.createBlockChannelReader(channel, 
bufferReturnQueue);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
new file mode 100644
index 0000000..177e83b
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CompressedHeaderlessChannelReaderInputView} and
+ * {@link CompressedHeaderlessChannelWriterOutputView}.
+ */
+public class CompressedHeaderlessChannelTest {
+       private static final int BUFFER_SIZE = 256;
+
+       private IOManager ioManager;
+
+       private BlockCompressionFactory compressionFactory = new 
Lz4BlockCompressionFactory();
+
+       public CompressedHeaderlessChannelTest() {
+               ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               this.ioManager.shutdown();
+               if (!this.ioManager.isProperlyShutDown()) {
+                       Assert.fail("I/O Manager was not properly shut down.");
+               }
+       }
+
+       @Test
+       public void testCompressedView() throws IOException {
+               for (int testTime = 0; testTime < 10; testTime++) {
+                       int testRounds = new Random().nextInt(20000);
+                       FileIOChannel.ID channel = ioManager.createChannel();
+                       BufferFileWriter writer = 
this.ioManager.createBufferFileWriter(channel);
+                       CompressedHeaderlessChannelWriterOutputView outputView =
+                                       new 
CompressedHeaderlessChannelWriterOutputView(
+                                                       writer,
+                                                       compressionFactory,
+                                                       BUFFER_SIZE
+                                       );
+
+                       for (int i = 0; i < testRounds; i++) {
+                               outputView.writeInt(i);
+                       }
+                       outputView.close();
+                       int blockCount = outputView.getBlockCount();
+
+                       CompressedHeaderlessChannelReaderInputView inputView =
+                                       new 
CompressedHeaderlessChannelReaderInputView(
+                                                       channel,
+                                                       ioManager,
+                                                       compressionFactory,
+                                                       BUFFER_SIZE,
+                                                       blockCount
+                                       );
+
+                       for (int i = 0; i < testRounds; i++) {
+                               assertEquals(i, inputView.readInt());
+                       }
+                       inputView.close();
+               }
+       }
+}

Reply via email to