This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a4c71c8d021 [FLINK-35175][filesystem] Avoid reading with ByteBuffer on hadoop version below 3.3.0 (#24691) a4c71c8d021 is described below commit a4c71c8d021f5c07c81e69369139d4455da475ca Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Sat Apr 20 01:16:27 2024 +0800 [FLINK-35175][filesystem] Avoid reading with ByteBuffer on hadoop version below 3.3.0 (#24691) --- .../runtime/fs/hdfs/HadoopDataInputStream.java | 68 ++++++++++------------ 1 file changed, 30 insertions(+), 38 deletions(-) diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java index 8f4e64c5535..e76aaf993a1 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java @@ -21,8 +21,6 @@ package org.apache.flink.runtime.fs.hdfs; import org.apache.flink.core.fs.ByteBufferReadable; import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.hadoop.fs.StreamCapabilities; - import javax.annotation.Nonnull; import java.io.IOException; @@ -147,53 +145,47 @@ public final class HadoopDataInputStream extends FSDataInputStream implements By @Override public int read(ByteBuffer byteBuffer) throws IOException { - // Not all internal stream supports ByteBufferReadable - if (fsDataInputStream.hasCapability(StreamCapabilities.READBYTEBUFFER)) { - return fsDataInputStream.read(byteBuffer); + // TODO: Use org.apache.hadoop.fs.FSDataInputStream#read(ByteBuffer) to improve the + // performance after updating hadoop version to 3.3.0 and above. + if (byteBuffer.hasArray()) { + int len = byteBuffer.remaining(); + fsDataInputStream.readFully(byteBuffer.array(), byteBuffer.arrayOffset(), len); + return len; } else { - if (byteBuffer.hasArray()) { - int len = byteBuffer.remaining(); - fsDataInputStream.readFully(byteBuffer.array(), byteBuffer.arrayOffset(), len); - return len; - } else { - // Fallback to read byte then put - int c = read(); + // Fallback to read byte then put + int c = read(); + if (c == -1) { + return -1; + } + byteBuffer.put((byte) c); + + int n = 1, len = byteBuffer.remaining() + 1; + for (; n < len; n++) { + c = read(); if (c == -1) { - return -1; + break; } byteBuffer.put((byte) c); - - int n = 1, len = byteBuffer.remaining() + 1; - for (; n < len; n++) { - c = read(); - if (c == -1) { - break; - } - byteBuffer.put((byte) c); - } - return n; } + return n; } } @Override public int read(long position, ByteBuffer byteBuffer) throws IOException { - // Not all internal stream supports ByteBufferPositionedReadable - if (fsDataInputStream.hasCapability(StreamCapabilities.PREADBYTEBUFFER)) { - return fsDataInputStream.read(position, byteBuffer); + // TODO: Use org.apache.hadoop.fs.FSDataInputStream#read(long, ByteBuffer) to improve the + // performance after updating hadoop version to 3.3.0 and above. + if (byteBuffer.hasArray()) { + int len = byteBuffer.remaining(); + fsDataInputStream.readFully( + position, byteBuffer.array(), byteBuffer.arrayOffset(), len); + return len; } else { - if (byteBuffer.hasArray()) { - int len = byteBuffer.remaining(); - fsDataInputStream.readFully( - position, byteBuffer.array(), byteBuffer.arrayOffset(), len); - return len; - } else { - // Fallback to positionable read bytes then put - byte[] tmp = new byte[byteBuffer.remaining()]; - fsDataInputStream.readFully(position, tmp, 0, tmp.length); - byteBuffer.put(tmp); - return tmp.length; - } + // Fallback to positionable read bytes then put + byte[] tmp = new byte[byteBuffer.remaining()]; + fsDataInputStream.readFully(position, tmp, 0, tmp.length); + byteBuffer.put(tmp); + return tmp.length; } } }