This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3ed490f [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data (#7944) 3ed490f is described below commit 3ed490f0f4e561f1ef8b3660d4370030db6115dc Author: Kurt Young <ykt...@gmail.com> AuthorDate: Sat Mar 9 13:34:56 2019 +0800 [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data (#7944) --- .../iomanager/AbstractChannelReaderInputView.java | 52 +++++ .../iomanager/AbstractChannelWriterOutputView.java | 63 ++++++ .../io/disk/iomanager/ChannelReaderInputView.java | 106 +++++----- .../HeaderlessChannelReaderInputView.java | 59 ++++-- .../runtime/memory/AbstractPagedInputView.java | 6 +- .../flink/table/runtime/io/ChannelWithMeta.java | 49 +++++ .../runtime/io/CompressedBlockChannelReader.java | 222 +++++++++++++++++++++ .../runtime/io/CompressedBlockChannelWriter.java | 187 +++++++++++++++++ ...CompressedHeaderlessChannelReaderInputView.java | 162 +++++++++++++++ ...ompressedHeaderlessChannelWriterOutputView.java | 134 +++++++++++++ .../io/HeaderlessChannelWriterOutputView.java | 124 ++++++++++++ .../flink/table/runtime/util/FileChannelUtil.java | 155 ++++++++++++++ .../io/CompressedHeaderlessChannelTest.java | 94 +++++++++ 13 files changed, 1352 insertions(+), 61 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java new file mode 100644 index 0000000..b0fb22a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelReaderInputView.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk.iomanager; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.memory.AbstractPagedInputView; + +import java.io.IOException; +import java.util.List; + +/** + * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a + * {@link FileIOChannel}, making it effectively a data input stream. The view reads it data + * in blocks from the underlying channel. The view can only read data that + * has been written by a {@link ChannelWriterOutputView}, due to block formatting. + */ +public abstract class AbstractChannelReaderInputView extends AbstractPagedInputView { + + public AbstractChannelReaderInputView(int headerLength) { + super(headerLength); + } + + /** + * Closes this InputView, closing the underlying reader and returning all memory segments. + * + * @return A list containing all memory segments originally supplied to this view. + * @throws IOException Thrown, if the underlying reader could not be properly closed. + */ + public abstract List<MemorySegment> close() throws IOException; + + /** + * Get the underlying channel. + */ + public abstract FileIOChannel getChannel(); + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java new file mode 100644 index 0000000..ff2c858 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractChannelWriterOutputView.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk.iomanager; + +import org.apache.flink.runtime.memory.AbstractPagedOutputView; + +import java.io.IOException; + +/** + * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a {@link FileIOChannel}, + * making it effectively a data output stream. The view writes it data in blocks to the underlying + * channel. + */ +public abstract class AbstractChannelWriterOutputView extends AbstractPagedOutputView { + + public AbstractChannelWriterOutputView(int segmentSize, int headerLength) { + super(segmentSize, headerLength); + } + + /** + * Get the underlying channel. + */ + public abstract FileIOChannel getChannel(); + + /** + * Closes this OutputView, closing the underlying writer + * + * @return the number of bytes in last memory segment. + */ + public abstract int close() throws IOException; + + /** + * Gets the number of blocks used by this view. + */ + public abstract int getBlockCount(); + + /** + * Get output bytes. + */ + public abstract long getNumBytes() throws IOException; + + /** + * Get output compressed bytes, return num bytes if there is no compression. + */ + public abstract long getNumCompressedBytes() throws IOException; + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java index de1ed87..b0dbc16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java @@ -33,49 +33,50 @@ import org.apache.flink.runtime.memory.AbstractPagedInputView; * stream. The view reads it data in blocks from the underlying channel. The view can only read data that * has been written by a {@link ChannelWriterOutputView}, due to block formatting. */ -public class ChannelReaderInputView extends AbstractPagedInputView { - +public class ChannelReaderInputView extends AbstractChannelReaderInputView { + protected final BlockChannelReader<MemorySegment> reader; // the block reader that reads memory segments - + protected int numRequestsRemaining; // the number of block requests remaining - + private final int numSegments; // the number of memory segment the view works with - + private final ArrayList<MemorySegment> freeMem; // memory gathered once the work is done - + private boolean inLastBlock; // flag indicating whether the view is already in the last block - + private boolean closed; // flag indicating whether the reader is closed - + // -------------------------------------------------------------------------------------------- /** * Creates a new channel reader that reads from the given channel until the last block * (as marked by a {@link ChannelWriterOutputView}) is found. - * + * * @param reader The reader that reads the data from disk back into memory. * @param memory A list of memory segments that the reader uses for reading the data in. If the * list contains more than one segment, the reader will asynchronously pre-fetch * blocks ahead. * @param waitForFirstBlock A flag indicating weather this constructor call should block * until the first block has returned from the asynchronous I/O reader. - * + * * @throws IOException Thrown, if the read requests for the first blocks fail to be * served by the reader. */ - public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, boolean waitForFirstBlock) - throws IOException - { + public ChannelReaderInputView( + BlockChannelReader<MemorySegment> reader, + List<MemorySegment> memory, + boolean waitForFirstBlock) throws IOException { this(reader, memory, -1, waitForFirstBlock); } - + /** * Creates a new channel reader that reads from the given channel, expecting a specified * number of blocks in the channel. * <p> * WARNING: The reader will lock if the number of blocks given here is actually lower than * the actual number of blocks in the channel. - * + * * @param reader The reader that reads the data from disk back into memory. * @param memory A list of memory segments that the reader uses for reading the data in. If the * list contains more than one segment, the reader will asynchronously pre-fetch @@ -85,23 +86,24 @@ public class ChannelReaderInputView extends AbstractPagedInputView { * beyond the channel size. * @param waitForFirstBlock A flag indicating weather this constructor call should block * until the first block has returned from the asynchronous I/O reader. - * + * * @throws IOException Thrown, if the read requests for the first blocks fail to be * served by the reader. */ - public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, - int numBlocks, boolean waitForFirstBlock) - throws IOException - { + public ChannelReaderInputView( + BlockChannelReader<MemorySegment> reader, + List<MemorySegment> memory, + int numBlocks, + boolean waitForFirstBlock) throws IOException { this(reader, memory, numBlocks, ChannelWriterOutputView.HEADER_LENGTH, waitForFirstBlock); } - + /** * Non public constructor to allow subclasses to use this input view with different headers. * <p> * WARNING: The reader will lock if the number of blocks given here is actually lower than * the actual number of blocks in the channel. - * + * * @param reader The reader that reads the data from disk back into memory. * @param memory A list of memory segments that the reader uses for reading the data in. If the * list contains more than one segment, the reader will asynchronously pre-fetch @@ -114,15 +116,17 @@ public class ChannelReaderInputView extends AbstractPagedInputView { * so any subclass changing the header length should override that methods as well. * @param waitForFirstBlock A flag indicating weather this constructor call should block * until the first block has returned from the asynchronous I/O reader. - * + * * @throws IOException */ - ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, - int numBlocks, int headerLen, boolean waitForFirstBlock) - throws IOException - { + ChannelReaderInputView( + BlockChannelReader<MemorySegment> reader, + List<MemorySegment> memory, + int numBlocks, + int headerLen, + boolean waitForFirstBlock) throws IOException { super(headerLen); - + if (reader == null || memory == null) { throw new NullPointerException(); } @@ -132,44 +136,45 @@ public class ChannelReaderInputView extends AbstractPagedInputView { if (numBlocks < 1 && numBlocks != -1) { throw new IllegalArgumentException("The number of blocks must be a positive number, or -1, if unknown."); } - + this.reader = reader; this.numRequestsRemaining = numBlocks; this.numSegments = memory.size(); this.freeMem = new ArrayList<MemorySegment>(this.numSegments); - + for (int i = 0; i < memory.size(); i++) { sendReadRequest(memory.get(i)); } - + if (waitForFirstBlock) { advance(); } } - + public void waitForFirstBlock() throws IOException { if (getCurrentSegment() == null) { advance(); } } - + public boolean isClosed() { return this.closed; } - + /** * Closes this InputView, closing the underlying reader and returning all memory segments. - * + * * @return A list containing all memory segments originally supplied to this view. * @throws IOException Thrown, if the underlying reader could not be properly closed. */ - public List<MemorySegment> close() throws IOException { + @Override + public List<MemorySegment> close() throws IOException { if (this.closed) { throw new IllegalStateException("Already closed."); } this.closed = true; - + // re-collect all memory segments ArrayList<MemorySegment> list = this.freeMem; final MemorySegment current = getCurrentSegment(); @@ -192,7 +197,12 @@ public class ChannelReaderInputView extends AbstractPagedInputView { } return list; } - + + @Override + public FileIOChannel getChannel() { + return reader; + } + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- @@ -203,12 +213,12 @@ public class ChannelReaderInputView extends AbstractPagedInputView { * adds the segment to the readers return queue, which thereby effectively collects all memory segments. * Secondly, the method fetches the next non-consumed segment * returned by the reader. If no further segments are available, this method thrown an {@link EOFException}. - * + * * @param current The memory segment used for the next request. * @return The memory segment to read from next. - * + * * @throws EOFException Thrown, if no further segments are available. - * @throws IOException Thrown, if an I/O error occurred while reading + * @throws IOException Thrown, if an I/O error occurred while reading * @see AbstractPagedInputView#nextSegment(org.apache.flink.core.memory.MemorySegment) */ @Override @@ -217,16 +227,16 @@ public class ChannelReaderInputView extends AbstractPagedInputView { if (this.inLastBlock) { throw new EOFException(); } - + // send a request first. if we have only a single segment, this same segment will be the one obtained in // the next lines if (current != null) { sendReadRequest(current); } - + // get the next segment final MemorySegment seg = this.reader.getNextReturnedBlock(); - + // check the header if (seg.getShort(0) != ChannelWriterOutputView.HEADER_MAGIC_NUMBER) { throw new IOException("The current block does not belong to a ChannelWriterOutputView / " + @@ -237,20 +247,20 @@ public class ChannelReaderInputView extends AbstractPagedInputView { this.numRequestsRemaining = 0; this.inLastBlock = true; } - + return seg; } - + @Override protected int getLimitForSegment(MemorySegment segment) { return segment.getInt(ChannelWriterOutputView.HEAD_BLOCK_LENGTH_OFFSET); } - + /** * Sends a new read requests, if further requests remain. Otherwise, this method adds the segment * directly to the readers return queue. - * + * * @param seg The segment to use for the read request. * @throws IOException Thrown, if the reader is in error. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java index 63e86c9..9d45205 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java @@ -24,6 +24,8 @@ import java.util.List; import org.apache.flink.core.memory.MemorySegment; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a * {@link BlockChannelReader}, making it effectively a data input @@ -31,14 +33,17 @@ import org.apache.flink.core.memory.MemorySegment; * a header for each block, giving a direct stream abstraction over sequence of written * blocks. It therefore requires specification of the number of blocks and the number of * bytes in the last block. - * */ -public class HeaderlessChannelReaderInputView extends ChannelReaderInputView -{ +public class HeaderlessChannelReaderInputView extends ChannelReaderInputView { + private int numBlocksRemaining; // the number of blocks not yet consumed private final int lastBlockBytes; // the number of valid bytes in the last block + private long offset; // offset to seek after reading the first block + + private boolean isFirstBlock; // if current block is the first block + /** * Creates a new channel reader that reads from the given channel, expecting a specified * number of blocks in the channel, and returns only a specified number of bytes from @@ -60,16 +65,36 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView * @throws IOException Thrown, if the read requests for the first blocks fail to be * served by the reader. */ - public HeaderlessChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, int numBlocks, - int numBytesInLastBlock, boolean waitForFirstBlock) - throws IOException - { - super(reader, memory, numBlocks, 0, waitForFirstBlock); - + public HeaderlessChannelReaderInputView( + BlockChannelReader<MemorySegment> reader, + List<MemorySegment> memory, + int numBlocks, + int numBytesInLastBlock, + boolean waitForFirstBlock) throws IOException { + this(reader, memory, numBlocks, numBytesInLastBlock, waitForFirstBlock, 0); + } + + public HeaderlessChannelReaderInputView( + BlockChannelReader<MemorySegment> reader, + List<MemorySegment> memory, + int numBlocks, + int numBytesInLastBlock, + boolean waitForFirstBlock, + long offset) throws IOException { + // postpone wait for first block after initializing offset + // otherwise the offset is not set, and we can't seek input + super(reader, memory, numBlocks, 0, false); + this.numBlocksRemaining = numBlocks; this.lastBlockBytes = numBytesInLastBlock; + + checkArgument(offset >= 0, "`offset` can't be negative!"); + this.offset = offset; + + if (waitForFirstBlock) { + advance(); + } } - @Override protected MemorySegment nextSegment(MemorySegment current) throws IOException { @@ -84,12 +109,22 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView if (current != null) { sendReadRequest(current); } - + + // check if next segment is the first block + isFirstBlock = (current == null); + // get the next segment this.numBlocksRemaining--; return this.reader.getNextReturnedBlock(); } - + + @Override + protected void advance() throws IOException { + doAdvance(); + if (isFirstBlock && offset > 0) { + seekInput(getCurrentSegment(), (int) offset, getCurrentSegmentLimit()); + } + } @Override protected int getLimitForSegment(MemorySegment segment) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java index 0536e1a..d73aeaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java @@ -152,7 +152,11 @@ public abstract class AbstractPagedInputView implements DataInputView { * @see #nextSegment(MemorySegment) * @see #getLimitForSegment(MemorySegment) */ - protected final void advance() throws IOException { + protected void advance() throws IOException { + doAdvance(); + } + + protected void doAdvance() throws IOException { // note: this code ensures that in case of EOF, we stay at the same position such that // EOF is reproducible (if nextSegment throws a reproducible EOFException) this.currentSegment = nextSegment(this.currentSegment); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/ChannelWithMeta.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/ChannelWithMeta.java new file mode 100644 index 0000000..15b82db --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/ChannelWithMeta.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; + +/** + * Channel with block count and numBytesInLastBlock of file. + */ +public class ChannelWithMeta { + + private final FileIOChannel.ID channel; + private final int blockCount; + private final int numBytesInLastBlock; + + public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, int numBytesInLastBlock) { + this.channel = channel; + this.blockCount = blockCount; + this.numBytesInLastBlock = numBytesInLastBlock; + } + + public FileIOChannel.ID getChannel() { + return channel; + } + + public int getBlockCount() { + return blockCount; + } + + public int getNumBytesInLastBlock() { + return numBytesInLastBlock; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java new file mode 100644 index 0000000..6be2c2e --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.table.runtime.compression.BlockDecompressor; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Compressed block channel reader provides a scenario where MemorySegment must be maintained. + */ +public class CompressedBlockChannelReader + implements BlockChannelReader<MemorySegment>, RequestDoneCallback<Buffer>, BufferRecycler { + + private final LinkedBlockingQueue<MemorySegment> blockQueue; + private final boolean copyCompress; + private final BlockDecompressor decompressor; + private final BufferFileReader reader; + private final AtomicReference<IOException> cause; + private final LinkedBlockingQueue<Buffer> retBuffers = new LinkedBlockingQueue<>(); + + private byte[] buf; + private ByteBuffer bufWrapper; + private int offset; + private int len; + + public CompressedBlockChannelReader( + IOManager ioManager, + ID channel, + LinkedBlockingQueue<MemorySegment> blockQueue, + BlockCompressionFactory codecFactory, + int preferBlockSize, + int segmentSize) throws IOException { + this.reader = ioManager.createBufferFileReader(channel, this); + this.blockQueue = blockQueue; + copyCompress = preferBlockSize > segmentSize * 2; + int blockSize = copyCompress ? preferBlockSize : segmentSize; + this.decompressor = codecFactory.getDecompressor(); + cause = new AtomicReference<>(); + + if (copyCompress) { + this.buf = new byte[blockSize]; + this.bufWrapper = ByteBuffer.wrap(buf); + } + + BlockCompressor compressor = codecFactory.getCompressor(); + for (int i = 0; i < 2; i++) { + MemorySegment segment = MemorySegmentFactory.wrap(new byte[compressor.getMaxCompressedSize(blockSize)]); + reader.readInto(new NetworkBuffer(segment, this)); + } + } + + @Override + public void readBlock(MemorySegment segment) throws IOException { + if (cause.get() != null) { + throw cause.get(); + } + + if (copyCompress) { + int readOffset = 0; + int readLen = segment.size(); + + while (readLen > 0) { + int copy = Math.min(readLen, len - offset); + if (copy == 0) { + readBuffer(); + } else { + segment.put(readOffset, buf, offset, copy); + offset += copy; + readOffset += copy; + readLen -= copy; + } + } + } else { + int len = decompressBuffer(segment.wrap(0, segment.size())); + Preconditions.checkState(len == segment.size()); + } + + boolean add = blockQueue.add(segment); + Preconditions.checkState(add); // LinkedBlockingQueue never add fail. + } + + private void readBuffer() throws IOException { + len = decompressBuffer(bufWrapper); + } + + private int decompressBuffer(ByteBuffer toRead) throws IOException { + try { + Buffer buffer; + while ((buffer = retBuffers.poll(11000, TimeUnit.MILLISECONDS)) == null) { + if (cause.get() != null) { + throw cause.get(); + } + } + + int readLen = decompressor.decompress( + buffer.getMemorySegment().wrap(0, buffer.getSize()), 0, buffer.getSize(), + toRead, 0); + + buffer.recycleBuffer(); + return readLen; + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public void seekToPosition(long position) throws IOException { + throw new RuntimeException("Not support yet!"); + } + + @Override + public MemorySegment getNextReturnedBlock() throws IOException { + try { + while (true) { + final MemorySegment next = blockQueue.poll(1000, TimeUnit.MILLISECONDS); + if (next != null) { + return next; + } else { + if (reader.isClosed()) { + throw new IOException("The writer has been closed."); + } + } + } + } catch (InterruptedException e) { + throw new IOException("Writer was interrupted while waiting for the next returning segment."); + } + } + + @Override + public LinkedBlockingQueue<MemorySegment> getReturnQueue() { + return blockQueue; + } + + @Override + public ID getChannelID() { + return reader.getChannelID(); + } + + @Override + public long getSize() throws IOException { + return reader.getSize(); + } + + @Override + public boolean isClosed() { + return reader.isClosed(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public void deleteChannel() { + reader.deleteChannel(); + } + + @Override + public void closeAndDelete() throws IOException { + reader.closeAndDelete(); + } + + @Override + public FileChannel getNioFileChannel() { + return reader.getNioFileChannel(); + } + + @Override + public void requestSuccessful(Buffer request) { + retBuffers.add(request); + } + + @Override + public void requestFailed(Buffer buffer, IOException e) { + cause.compareAndSet(null, e); + throw new RuntimeException(e); + } + + @Override + public void recycle(MemorySegment segment) { + try { + reader.readInto(new NetworkBuffer(segment, this)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java new file mode 100644 index 0000000..76db926 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Compressed block channel writer provides a scenario where MemorySegment must be maintained. + */ +public class CompressedBlockChannelWriter + implements BlockChannelWriter<MemorySegment>, BufferRecycler { + + private final LinkedBlockingQueue<MemorySegment> blockQueue; + private final LinkedBlockingQueue<MemorySegment> compressedBuffers = new LinkedBlockingQueue<>(); + private final BufferFileWriter writer; + private final boolean copyCompress; + private final BlockCompressor compressor; + + private byte[] buf; + private ByteBuffer bufWrapper; + private int count; + + public CompressedBlockChannelWriter( + IOManager ioManager, ID channel, + LinkedBlockingQueue<MemorySegment> blockQueue, + BlockCompressionFactory codecFactory, int preferBlockSize, int segmentSize) throws IOException { + this.writer = ioManager.createBufferFileWriter(channel); + this.blockQueue = blockQueue; + copyCompress = preferBlockSize > segmentSize * 2; + int blockSize = copyCompress ? preferBlockSize : segmentSize; + this.compressor = codecFactory.getCompressor(); + + if (copyCompress) { + this.buf = new byte[blockSize]; + this.bufWrapper = ByteBuffer.wrap(buf); + } + + for (int i = 0; i < 2; i++) { + compressedBuffers.add(MemorySegmentFactory.wrap( + new byte[compressor.getMaxCompressedSize(blockSize)])); + } + } + + @Override + public void writeBlock(MemorySegment block) throws IOException { + if (copyCompress) { + int offset = 0; + int len = block.size(); + + while (len > 0) { + int copy = Math.min(len, buf.length - count); + if (copy == 0) { + flushBuffer(); + } else { + block.get(offset, buf, count, copy); + count += copy; + offset += copy; + len -= copy; + } + } + } else { + compressBuffer(block.wrap(0, block.size()), block.size()); + } + + boolean add = blockQueue.add(block); + Preconditions.checkState(add); // LinkedBlockingQueue never add fail. + } + + private void flushBuffer() throws IOException { + compressBuffer(bufWrapper, count); + count = 0; + } + + private void compressBuffer(ByteBuffer buffer, int len) throws IOException { + MemorySegment compressedBuffer; + try { + compressedBuffer = compressedBuffers.take(); + } catch (InterruptedException e) { + throw new IOException(e); + } + int compressedLen = compressor.compress( + buffer, 0, len, + compressedBuffer.wrap(0, compressedBuffer.size()), 0); + NetworkBuffer networkBuffer = new NetworkBuffer(compressedBuffer, this); + networkBuffer.setSize(compressedLen); + writer.writeBlock(networkBuffer); + } + + @Override + public ID getChannelID() { + return writer.getChannelID(); + } + + @Override + public long getSize() throws IOException { + return writer.getSize(); + } + + @Override + public boolean isClosed() { + return writer.isClosed(); + } + + @Override + public void close() throws IOException { + if (!writer.isClosed()) { + if (copyCompress) { + flushBuffer(); + } + this.writer.close(); + } + } + + @Override + public void deleteChannel() { + writer.deleteChannel(); + } + + @Override + public void closeAndDelete() throws IOException { + writer.closeAndDelete(); + } + + @Override + public FileChannel getNioFileChannel() { + return writer.getNioFileChannel(); + } + + @Override + public void recycle(MemorySegment memorySegment) { + compressedBuffers.add(memorySegment); + } + + @Override + public MemorySegment getNextReturnedBlock() throws IOException { + try { + while (true) { + final MemorySegment next = blockQueue.poll(1000, TimeUnit.MILLISECONDS); + if (next != null) { + return next; + } else { + if (writer.isClosed()) { + throw new IOException("The writer has been closed."); + } + } + } + } catch (InterruptedException e) { + throw new IOException("Writer was interrupted while waiting for the next returning segment."); + } + } + + @Override + public LinkedBlockingQueue<MemorySegment> getReturnQueue() { + return blockQueue; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java new file mode 100644 index 0000000..7c879b9 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.table.runtime.compression.BlockDecompressor; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a + * {@link BufferFileReader}, making it effectively a data input stream. The view reads it data + * in blocks from the underlying channel and decompress it before returning to caller. The view + * can only read data that has been written by {@link CompressedHeaderlessChannelWriterOutputView}, + * due to block formatting. + */ +public class CompressedHeaderlessChannelReaderInputView + extends AbstractChannelReaderInputView + implements RequestDoneCallback<Buffer>, BufferRecycler { + + private final BlockDecompressor decompressor; + private final BufferFileReader reader; + private final MemorySegment uncompressedBuffer; + private final AtomicReference<IOException> cause; + + private final LinkedBlockingQueue<Buffer> retBuffers = new LinkedBlockingQueue<>(); + + private int numBlocksRemaining; + private int currentSegmentLimit; + + public CompressedHeaderlessChannelReaderInputView( + FileIOChannel.ID id, + IOManager ioManager, + BlockCompressionFactory compressionCodecFactory, + int compressionBlockSize, + int numBlocks) throws IOException { + super(0); + this.numBlocksRemaining = numBlocks; + this.reader = ioManager.createBufferFileReader(id, this); + uncompressedBuffer = MemorySegmentFactory.wrap(new byte[compressionBlockSize]); + decompressor = compressionCodecFactory.getDecompressor(); + cause = new AtomicReference<>(); + + BlockCompressor compressor = compressionCodecFactory.getCompressor(); + for (int i = 0; i < 2; i++) { + MemorySegment segment = MemorySegmentFactory.wrap(new byte[compressor.getMaxCompressedSize( + compressionBlockSize)]); + reader.readInto(new NetworkBuffer(segment, this)); + } + } + + @Override + protected MemorySegment nextSegment(MemorySegment current) throws IOException { + if (cause.get() != null) { + throw cause.get(); + } + + // check for end-of-stream + if (this.numBlocksRemaining <= 0) { + this.reader.close(); + throw new EOFException(); + } + + try { + Buffer buffer; + while ((buffer = retBuffers.poll(1, TimeUnit.SECONDS)) == null) { + if (cause.get() != null) { + throw cause.get(); + } + } + this.currentSegmentLimit = decompressor.decompress( + buffer.getMemorySegment().getArray(), 0, buffer.getSize(), + uncompressedBuffer.getArray(), 0 + ); + + buffer.recycleBuffer(); + this.numBlocksRemaining--; + return uncompressedBuffer; + } + catch (InterruptedException e) { + throw new IOException(e); + } + } + + public BufferFileReader getReader() { + return reader; + } + + @Override + protected int getLimitForSegment(MemorySegment segment) { + return currentSegmentLimit; + } + + @Override + public List<MemorySegment> close() throws IOException { + reader.close(); + return Collections.emptyList(); + } + + @Override + public FileIOChannel getChannel() { + return reader; + } + + public boolean isClosed() { + return reader.isClosed(); + } + + @Override + public void requestSuccessful(Buffer request) { + retBuffers.add(request); + } + + @Override + public void requestFailed(Buffer buffer, IOException e) { + cause.compareAndSet(null, e); + throw new RuntimeException(e); + } + + @Override + public void recycle(MemorySegment segment) { + try { + reader.readInto(new NetworkBuffer(segment, this)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java new file mode 100644 index 0000000..a67ef65 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a {@link FileIOChannel}, + * making it effectively a data output stream. The view will compress its data before writing it + * in blocks to the underlying channel. + */ +public final class CompressedHeaderlessChannelWriterOutputView + extends AbstractChannelWriterOutputView implements BufferRecycler { + + private final MemorySegment buffer; + private final LinkedBlockingQueue<MemorySegment> compressedBuffers = new LinkedBlockingQueue<>(); + private final BlockCompressor compressor; + private final BufferFileWriter writer; + private final int compressionBlockSize; + + private int blockCount; + + private long numBytes; + private long numCompressedBytes; + + public CompressedHeaderlessChannelWriterOutputView( + BufferFileWriter writer, + BlockCompressionFactory compressionCodecFactory, + int compressionBlockSize) { + super(compressionBlockSize, 0); + + this.compressionBlockSize = compressionBlockSize; + buffer = MemorySegmentFactory.wrap(new byte[compressionBlockSize]); + compressor = compressionCodecFactory.getCompressor(); + for (int i = 0; i < 2; i++) { + compressedBuffers.add(MemorySegmentFactory.wrap( + new byte[compressor.getMaxCompressedSize(compressionBlockSize)])); + } + this.writer = writer; + + try { + advance(); + } catch (IOException ioex) { + throw new RuntimeException(ioex); + } + } + + @Override + public FileIOChannel getChannel() { + return writer; + } + + @Override + public int close() throws IOException { + if (!writer.isClosed()) { + int currentPositionInSegment = getCurrentPositionInSegment(); + writeCompressed(buffer, currentPositionInSegment); + clear(); + this.writer.close(); + } + return -1; + } + + @Override + protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException { + if (current != null) { + writeCompressed(current, compressionBlockSize); + } + return buffer; + } + + private void writeCompressed(MemorySegment current, int size) throws IOException { + MemorySegment compressedBuffer; + try { + compressedBuffer = compressedBuffers.take(); + } catch (InterruptedException e) { + throw new IOException(e); + } + int compressedLen = compressor.compress(current.getArray(), 0, size, compressedBuffer.getArray(), 0); + NetworkBuffer networkBuffer = new NetworkBuffer(compressedBuffer, this); + networkBuffer.setSize(compressedLen); + writer.writeBlock(networkBuffer); + blockCount++; + numBytes += size; + numCompressedBytes += compressedLen; + } + + @Override + public long getNumBytes() { + return numBytes; + } + + @Override + public long getNumCompressedBytes() { + return numCompressedBytes; + } + + @Override + public int getBlockCount() { + return blockCount; + } + + @Override + public void recycle(MemorySegment memorySegment) { + compressedBuffers.add(memorySegment); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/HeaderlessChannelWriterOutputView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/HeaderlessChannelWriterOutputView.java new file mode 100644 index 0000000..d581995 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/HeaderlessChannelWriterOutputView.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a + * {@link BlockChannelWriter}, making it effectively a data output stream. The view writes its + * data in blocks to the underlying channel, but does not expect header for each block. + */ +public final class HeaderlessChannelWriterOutputView extends AbstractChannelWriterOutputView { + + private final BlockChannelWriter<MemorySegment> writer; // the writer to the channel + + private int blockCount; // the number of blocks used + + public HeaderlessChannelWriterOutputView( + BlockChannelWriter<MemorySegment> writer, + List<MemorySegment> memory, + int segmentSize) { + super(segmentSize, 0); + + if (writer == null) { + throw new NullPointerException(); + } + + this.writer = writer; + + Preconditions.checkNotNull(memory); + + // load the segments into the queue + final LinkedBlockingQueue<MemorySegment> queue = writer.getReturnQueue(); + for (int i = memory.size() - 1; i >= 0; --i) { + final MemorySegment seg = memory.get(i); + if (seg.size() != segmentSize) { + throw new IllegalArgumentException("This segment are not of the specified size."); + } + queue.add(seg); + } + + // get the first segment + try { + advance(); + } catch (IOException ioex) { + throw new RuntimeException(ioex); + } + } + + @Override + public FileIOChannel getChannel() { + return writer; + } + + /** + * Closes this OutputView, closing the underlying writer. And return number bytes in last + * memory segment. + */ + @Override + public int close() throws IOException { + if (!writer.isClosed()) { + int currentPositionInSegment = getCurrentPositionInSegment(); + // write last segment + writer.writeBlock(getCurrentSegment()); + clear(); + + writer.getReturnQueue().clear(); + this.writer.close(); + + return currentPositionInSegment; + } + return -1; + } + + @Override + public int getBlockCount() { + return this.blockCount; + } + + @Override + public long getNumBytes() throws IOException { + return writer.getSize(); + } + + @Override + public long getNumCompressedBytes() throws IOException { + return writer.getSize(); + } + + @Override + public MemorySegment nextSegment(MemorySegment current, int posInSegment) throws IOException { + if (current != null) { + writer.writeBlock(current); + } + + final MemorySegment next = this.writer.getNextReturnedBlock(); + this.blockCount++; + return next; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java new file mode 100644 index 0000000..780cbfd --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.io.ChannelWithMeta; +import org.apache.flink.table.runtime.io.CompressedBlockChannelReader; +import org.apache.flink.table.runtime.io.CompressedBlockChannelWriter; +import org.apache.flink.table.runtime.io.CompressedHeaderlessChannelReaderInputView; +import org.apache.flink.table.runtime.io.CompressedHeaderlessChannelWriterOutputView; +import org.apache.flink.table.runtime.io.HeaderlessChannelWriterOutputView; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment; + +/** + * File channel util for runtime. + */ +public class FileChannelUtil { + + public static AbstractChannelReaderInputView createInputView( + IOManager ioManager, + ChannelWithMeta channel, + List<FileIOChannel> channels, + boolean compressionEnable, + BlockCompressionFactory compressionCodecFactory, + int compressionBlockSize, + int segmentSize) throws IOException { + if (compressionEnable) { + CompressedHeaderlessChannelReaderInputView in = + new CompressedHeaderlessChannelReaderInputView( + channel.getChannel(), + ioManager, + compressionCodecFactory, + compressionBlockSize, + channel.getBlockCount() + ); + channels.add(in.getReader()); + return in; + } else { + BlockChannelReader<MemorySegment> reader = + ioManager.createBlockChannelReader(channel.getChannel()); + channels.add(reader); + return new HeaderlessChannelReaderInputView( + reader, + Arrays.asList( + allocateUnpooledSegment(segmentSize), + allocateUnpooledSegment(segmentSize) + ), + channel.getBlockCount(), + channel.getNumBytesInLastBlock(), false + ); + } + } + + public static AbstractChannelWriterOutputView createOutputView( + IOManager ioManager, + FileIOChannel.ID channel, + boolean compressionEnable, + BlockCompressionFactory compressionCodecFactory, + int compressionBlockSize, + int segmentSize) throws IOException { + if (compressionEnable) { + BufferFileWriter bufferWriter = ioManager.createBufferFileWriter(channel); + return new CompressedHeaderlessChannelWriterOutputView( + bufferWriter, + compressionCodecFactory, + compressionBlockSize); + } else { + BlockChannelWriter<MemorySegment> blockWriter = + ioManager.createBlockChannelWriter(channel); + return new HeaderlessChannelWriterOutputView( + blockWriter, + Arrays.asList( + allocateUnpooledSegment(segmentSize), + allocateUnpooledSegment(segmentSize) + ), + segmentSize + ); + } + } + + public static BlockChannelWriter<MemorySegment> createBlockChannelWriter( + IOManager ioManager, + FileIOChannel.ID channel, + LinkedBlockingQueue<MemorySegment> bufferReturnQueue, + boolean compressionEnable, + BlockCompressionFactory compressionCodecFactory, + int compressionBlockSize, + int segmentSize) throws IOException { + if (compressionEnable) { + return new CompressedBlockChannelWriter( + ioManager, + channel, + bufferReturnQueue, + compressionCodecFactory, + compressionBlockSize, + segmentSize + ); + } else { + return ioManager.createBlockChannelWriter(channel, bufferReturnQueue); + } + } + + public static BlockChannelReader<MemorySegment> createBlockChannelReader( + IOManager ioManager, + FileIOChannel.ID channel, + LinkedBlockingQueue<MemorySegment> bufferReturnQueue, + boolean compressionEnable, + BlockCompressionFactory compressionCodecFactory, + int compressionBlockSize, + int segmentSize) throws IOException { + if (compressionEnable) { + return new CompressedBlockChannelReader( + ioManager, + channel, + bufferReturnQueue, + compressionCodecFactory, + compressionBlockSize, + segmentSize + ); + } else { + return ioManager.createBlockChannelReader(channel, bufferReturnQueue); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java new file mode 100644 index 0000000..177e83b --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CompressedHeaderlessChannelReaderInputView} and + * {@link CompressedHeaderlessChannelWriterOutputView}. + */ +public class CompressedHeaderlessChannelTest { + private static final int BUFFER_SIZE = 256; + + private IOManager ioManager; + + private BlockCompressionFactory compressionFactory = new Lz4BlockCompressionFactory(); + + public CompressedHeaderlessChannelTest() { + ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O Manager was not properly shut down."); + } + } + + @Test + public void testCompressedView() throws IOException { + for (int testTime = 0; testTime < 10; testTime++) { + int testRounds = new Random().nextInt(20000); + FileIOChannel.ID channel = ioManager.createChannel(); + BufferFileWriter writer = this.ioManager.createBufferFileWriter(channel); + CompressedHeaderlessChannelWriterOutputView outputView = + new CompressedHeaderlessChannelWriterOutputView( + writer, + compressionFactory, + BUFFER_SIZE + ); + + for (int i = 0; i < testRounds; i++) { + outputView.writeInt(i); + } + outputView.close(); + int blockCount = outputView.getBlockCount(); + + CompressedHeaderlessChannelReaderInputView inputView = + new CompressedHeaderlessChannelReaderInputView( + channel, + ioManager, + compressionFactory, + BUFFER_SIZE, + blockCount + ); + + for (int i = 0; i < testRounds; i++) { + assertEquals(i, inputView.readInt()); + } + inputView.close(); + } + } +}