This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a4c71c8d021 [FLINK-35175][filesystem] Avoid reading with ByteBuffer on 
 hadoop version below 3.3.0 (#24691)
a4c71c8d021 is described below

commit a4c71c8d021f5c07c81e69369139d4455da475ca
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Sat Apr 20 01:16:27 2024 +0800

    [FLINK-35175][filesystem] Avoid reading with ByteBuffer on  hadoop version 
below 3.3.0 (#24691)
---
 .../runtime/fs/hdfs/HadoopDataInputStream.java     | 68 ++++++++++------------
 1 file changed, 30 insertions(+), 38 deletions(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
index 8f4e64c5535..e76aaf993a1 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.fs.hdfs;
 import org.apache.flink.core.fs.ByteBufferReadable;
 import org.apache.flink.core.fs.FSDataInputStream;
 
-import org.apache.hadoop.fs.StreamCapabilities;
-
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
@@ -147,53 +145,47 @@ public final class HadoopDataInputStream extends 
FSDataInputStream implements By
 
     @Override
     public int read(ByteBuffer byteBuffer) throws IOException {
-        // Not all internal stream supports ByteBufferReadable
-        if 
(fsDataInputStream.hasCapability(StreamCapabilities.READBYTEBUFFER)) {
-            return fsDataInputStream.read(byteBuffer);
+        // TODO: Use org.apache.hadoop.fs.FSDataInputStream#read(ByteBuffer) 
to improve the
+        //  performance after updating hadoop version to 3.3.0 and above.
+        if (byteBuffer.hasArray()) {
+            int len = byteBuffer.remaining();
+            fsDataInputStream.readFully(byteBuffer.array(), 
byteBuffer.arrayOffset(), len);
+            return len;
         } else {
-            if (byteBuffer.hasArray()) {
-                int len = byteBuffer.remaining();
-                fsDataInputStream.readFully(byteBuffer.array(), 
byteBuffer.arrayOffset(), len);
-                return len;
-            } else {
-                // Fallback to read byte then put
-                int c = read();
+            // Fallback to read byte then put
+            int c = read();
+            if (c == -1) {
+                return -1;
+            }
+            byteBuffer.put((byte) c);
+
+            int n = 1, len = byteBuffer.remaining() + 1;
+            for (; n < len; n++) {
+                c = read();
                 if (c == -1) {
-                    return -1;
+                    break;
                 }
                 byteBuffer.put((byte) c);
-
-                int n = 1, len = byteBuffer.remaining() + 1;
-                for (; n < len; n++) {
-                    c = read();
-                    if (c == -1) {
-                        break;
-                    }
-                    byteBuffer.put((byte) c);
-                }
-                return n;
             }
+            return n;
         }
     }
 
     @Override
     public int read(long position, ByteBuffer byteBuffer) throws IOException {
-        // Not all internal stream supports ByteBufferPositionedReadable
-        if 
(fsDataInputStream.hasCapability(StreamCapabilities.PREADBYTEBUFFER)) {
-            return fsDataInputStream.read(position, byteBuffer);
+        // TODO: Use org.apache.hadoop.fs.FSDataInputStream#read(long, 
ByteBuffer) to improve the
+        //  performance after updating hadoop version to 3.3.0 and above.
+        if (byteBuffer.hasArray()) {
+            int len = byteBuffer.remaining();
+            fsDataInputStream.readFully(
+                    position, byteBuffer.array(), byteBuffer.arrayOffset(), 
len);
+            return len;
         } else {
-            if (byteBuffer.hasArray()) {
-                int len = byteBuffer.remaining();
-                fsDataInputStream.readFully(
-                        position, byteBuffer.array(), 
byteBuffer.arrayOffset(), len);
-                return len;
-            } else {
-                // Fallback to positionable read bytes then put
-                byte[] tmp = new byte[byteBuffer.remaining()];
-                fsDataInputStream.readFully(position, tmp, 0, tmp.length);
-                byteBuffer.put(tmp);
-                return tmp.length;
-            }
+            // Fallback to positionable read bytes then put
+            byte[] tmp = new byte[byteBuffer.remaining()];
+            fsDataInputStream.readFully(position, tmp, 0, tmp.length);
+            byteBuffer.put(tmp);
+            return tmp.length;
         }
     }
 }

Reply via email to