This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 494a916d1c5566d7dea4476451ea42cca6c850fe
Author: stack <st...@apache.org>
AuthorDate: Tue Jul 9 08:58:16 2019 -0700

    HDFS-14483 Backport HDFS-3246,HDFS-14111 ByteBuffer pread interface to 
branch-2.9""
    
    This is a revert of a revert, i.e. a reapplication of the HDFS-14483
    patch. Original commit was missing the JIRA number.
    
    Revert "Revert "Backport HDFS-3246,HDFS-14111 ByteBuffer pread interface to 
branch-2.9""
    
    This reverts commit b26ad8a4c38d27894bd42d3fba7abff80bd895f4.
---
 .../apache/hadoop/crypto/CryptoInputStream.java    | 103 +++++++-
 .../hadoop/fs/ByteBufferPositionedReadable.java    |  65 +++++
 .../org/apache/hadoop/fs/FSDataInputStream.java    |  12 +-
 .../org/apache/hadoop/fs/StreamCapabilities.java   |  11 +
 .../hadoop/crypto/CryptoStreamsTestBase.java       | 175 ++++++++++++--
 .../apache/hadoop/crypto/TestCryptoStreams.java    |  36 ++-
 .../hadoop/crypto/TestCryptoStreamsForLocalFS.java |  10 +
 .../hadoop/crypto/TestCryptoStreamsNormal.java     |  10 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java     |  14 +-
 .../src/main/native/libhdfs-tests/hdfs_test.h      |  18 ++
 .../main/native/libhdfs-tests/test_libhdfs_ops.c   | 118 ++++++++-
 .../src/main/native/libhdfs/hdfs.c                 | 175 +++++++++++++-
 .../apache/hadoop/hdfs/TestByteBufferPread.java    | 269 +++++++++++++++++++++
 .../shortcircuit/TestShortCircuitLocalRead.java    |  25 +-
 14 files changed, 977 insertions(+), 64 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index a2273bf..2ff2be1 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
@@ -62,9 +63,10 @@ import org.apache.hadoop.util.StringUtils;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class CryptoInputStream extends FilterInputStream implements 
-    Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
-    CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, 
-    ReadableByteChannel, CanUnbuffer, StreamCapabilities {
+    Seekable, PositionedReadable, ByteBufferReadable,
+    ByteBufferPositionedReadable, HasFileDescriptor, CanSetDropBehind,
+    CanSetReadahead, HasEnhancedByteBufferAccess, ReadableByteChannel,
+    CanUnbuffer, StreamCapabilities {
   private final byte[] oneByteBuf = new byte[1];
   private final CryptoCodec codec;
   private final Decryptor decryptor;
@@ -341,6 +343,24 @@ public class CryptoInputStream extends FilterInputStream 
implements
           "positioned read.");
     }
   }
+
+  /** Positioned read using ByteBuffers. It is thread-safe */
+  @Override
+  public int read(long position, final ByteBuffer buf) throws IOException {
+    checkStream();
+    try {
+      int pos = buf.position();
+      final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
+      if (n > 0) {
+        // This operation does not change the current offset of the file
+        decrypt(position, buf, n, pos);
+      }
+      return n;
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException(
+          "This stream does not support " + "positioned read.");
+    }
+  }
   
   /**
    * Decrypt length bytes in buffer starting at offset. Output is also put 
@@ -375,7 +395,80 @@ public class CryptoInputStream extends FilterInputStream 
implements
       returnDecryptor(decryptor);
     }
   }
-  
+
+  /**
+   * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
+   * decrypted from {@code buf} starting at {@code start}.
+   * {@code buf.position()} and {@code buf.limit()} are unchanged after this
+   * method returns. This method is thread-safe.
+   *
+   * <p>
+   * This method decrypts the input buf chunk-by-chunk and writes the decrypted
+   * output back into the input buf. It uses two local buffers taken from the
+   * {@link #bufferPool} to assist in this process: one is designated as the
+   * input buffer and it stores a single chunk of the given buf, the other is
+   * designated as the output buffer, which stores the output of decrypting the
+   * input buffer. Both buffers are of size {@link #bufferSize}.
+   * </p>
+   *
+   * <p>
+   * Decryption is done by using a {@link Decryptor} and the
+   * {@link #decrypt(Decryptor, ByteBuffer, ByteBuffer, byte)} method. Once the
+   * decrypted data is written into the output buffer, is is copied back into
+   * buf. Both buffers are returned back into the pool once the entire buf is
+   * decrypted.
+   * </p>
+   *
+   * @param filePosition the current position of the file being read
+   * @param buf the {@link ByteBuffer} to decrypt
+   * @param length the number of bytes in {@code buf} to decrypt
+   * @param start the position in {@code buf} to start decrypting data from
+   */
+  private void decrypt(long filePosition, ByteBuffer buf, int length, int 
start)
+      throws IOException {
+    ByteBuffer localInBuffer = null;
+    ByteBuffer localOutBuffer = null;
+
+    // Duplicate the buffer so we don't have to worry about resetting the
+    // original position and limit at the end of the method
+    buf = buf.duplicate();
+
+    int decryptedBytes = 0;
+    Decryptor localDecryptor = null;
+    try {
+      localInBuffer = getBuffer();
+      localOutBuffer = getBuffer();
+      localDecryptor = getDecryptor();
+      byte[] localIV = initIV.clone();
+      updateDecryptor(localDecryptor, filePosition, localIV);
+      byte localPadding = getPadding(filePosition);
+      // Set proper filePosition for inputdata.
+      localInBuffer.position(localPadding);
+
+      while (decryptedBytes < length) {
+        buf.position(start + decryptedBytes);
+        buf.limit(start + decryptedBytes
+            + Math.min(length - decryptedBytes, localInBuffer.remaining()));
+        localInBuffer.put(buf);
+        // Do decryption
+        try {
+          decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
+          buf.position(start + decryptedBytes);
+          buf.limit(start + length);
+          decryptedBytes += localOutBuffer.remaining();
+          buf.put(localOutBuffer);
+        } finally {
+          localPadding = afterDecryption(localDecryptor, localInBuffer,
+              filePosition + length, localIV);
+        }
+      }
+    } finally {
+      returnBuffer(localInBuffer);
+      returnBuffer(localOutBuffer);
+      returnDecryptor(localDecryptor);
+    }
+  }
+
   /** Positioned read fully. It is thread-safe */
   @Override
   public void readFully(long position, byte[] buffer, int offset, int length)
@@ -740,6 +833,8 @@ public class CryptoInputStream extends FilterInputStream 
implements
     case StreamCapabilities.READAHEAD:
     case StreamCapabilities.DROPBEHIND:
     case StreamCapabilities.UNBUFFER:
+    case StreamCapabilities.READBYTEBUFFER:
+    case StreamCapabilities.PREADBYTEBUFFER:
       return true;
     default:
       return false;
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
new file mode 100644
index 0000000..3506870
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implementers of this interface provide a positioned read API that writes to 
a
+ * {@link ByteBuffer} rather than a {@code byte[]}.
+ *
+ * @see PositionedReadable
+ * @see ByteBufferReadable
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ByteBufferPositionedReadable {
+  /**
+   * Reads up to {@code buf.remaining()} bytes into buf from a given position 
in
+   * the file and returns the number of bytes read. Callers should use
+   * {@code buf.limit(...)} to control the size of the desired read and
+   * {@code buf.position(...)} to control the offset into the buffer the data
+   * should be written to.
+   * <p>
+   * After a successful call, {@code buf.position()} will be advanced by the
+   * number of bytes read and {@code buf.limit()} will be unchanged.
+   * <p>
+   * In the case of an exception, the state of the buffer (the contents of the
+   * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
+   * undefined, and callers should be prepared to recover from this 
eventuality.
+   * <p>
+   * Callers should use {@link StreamCapabilities#hasCapability(String)} with
+   * {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
+   * stream supports this interface, otherwise they might get a
+   * {@link UnsupportedOperationException}.
+   * <p>
+   * Implementations should treat 0-length requests as legitimate, and must not
+   * signal an error upon their receipt.
+   *
+   * @param position position within file
+   * @param buf the ByteBuffer to receive the results of the read operation.
+   * @return the number of bytes read, possibly zero, or -1 if reached
+   *         end-of-stream
+   * @throws IOException if there is some error performing the read
+   */
+  int read(long position, ByteBuffer buf) throws IOException;
+}
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 08d71f1..4868479 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -38,7 +38,8 @@ import org.apache.hadoop.util.IdentityHashStore;
 public class FSDataInputStream extends DataInputStream
     implements Seekable, PositionedReadable, 
       ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
-      HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
+    HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
+    ByteBufferPositionedReadable {
   /**
    * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
    * objects
@@ -246,4 +247,13 @@ public class FSDataInputStream extends DataInputStream
   public String toString() {
     return super.toString() + ": " + in;
   }
+
+  @Override
+  public int read(long position, ByteBuffer buf) throws IOException {
+    if (in instanceof ByteBufferPositionedReadable) {
+      return ((ByteBufferPositionedReadable)in).read(position, buf);
+    }
+    throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
+        "by input stream");
+  }
 }
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index 3549cdc..e26acd4 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -60,6 +60,17 @@ public interface StreamCapabilities {
   String UNBUFFER = "in:unbuffer";
 
   /**
+   * Stream read(ByteBuffer) capability implemented by
+   * {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
+   */
+  String READBYTEBUFFER = "in:readbytebuffer";
+
+  /**
+   * Stream read(long, ByteBuffer) capability implemented by
+   * {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
+   */
+  String PREADBYTEBUFFER = "in:preadbytebuffer";
+  /**
    * Capabilities that a stream can support and be queried for.
    */
   @Deprecated
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
index a0eb105..502abfc 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
@@ -26,6 +26,7 @@ import java.nio.ByteOrder;
 import java.util.EnumSet;
 import java.util.Random;
 
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -129,6 +130,32 @@ public abstract class CryptoStreamsTestBase {
     Assert.assertArrayEquals(result, expectedData);
   }
 
+  private int byteBufferPreadAll(ByteBufferPositionedReadable in,
+      ByteBuffer buf) throws IOException {
+    int n = 0;
+    int total = 0;
+    while (n != -1) {
+      total += n;
+      if (!buf.hasRemaining()) {
+        break;
+      }
+      n = in.read(total, buf);
+    }
+
+    return total;
+  }
+
+  private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
+      throws Exception {
+    ByteBuffer result = ByteBuffer.allocate(dataLen);
+    int n = byteBufferPreadAll(in, result);
+
+    Assert.assertEquals(dataLen, n);
+    ByteBuffer expectedData = ByteBuffer.allocate(n);
+    expectedData.put(data, 0, n);
+    Assert.assertArrayEquals(result.array(), expectedData.array());
+  }
+
   protected OutputStream getOutputStream(int bufferSize) throws IOException {
     return getOutputStream(bufferSize, key, iv);
   }
@@ -288,20 +315,36 @@ public abstract class CryptoStreamsTestBase {
     
     return total;
   }
+
+  private int readAll(InputStream in, long pos, ByteBuffer buf)
+      throws IOException {
+    int n = 0;
+    int total = 0;
+    while (n != -1) {
+      total += n;
+      if (!buf.hasRemaining()) {
+        break;
+      }
+      n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
+    }
+
+    return total;
+  }
   
   /** Test positioned read. */
   @Test(timeout=120000)
   public void testPositionedRead() throws Exception {
-    OutputStream out = getOutputStream(defaultBufferSize);
-    writeData(out);
+    try (OutputStream out = getOutputStream(defaultBufferSize)) {
+      writeData(out);
+    }
     
-    InputStream in = getInputStream(defaultBufferSize);
-    // Pos: 1/3 dataLen
-    positionedReadCheck(in , dataLen / 3);
+    try (InputStream in = getInputStream(defaultBufferSize)) {
+      // Pos: 1/3 dataLen
+      positionedReadCheck(in, dataLen / 3);
 
-    // Pos: 1/2 dataLen
-    positionedReadCheck(in, dataLen / 2);
-    in.close();
+      // Pos: 1/2 dataLen
+      positionedReadCheck(in, dataLen / 2);
+    }
   }
   
   private void positionedReadCheck(InputStream in, int pos) throws Exception {
@@ -315,6 +358,35 @@ public abstract class CryptoStreamsTestBase {
     System.arraycopy(data, pos, expectedData, 0, n);
     Assert.assertArrayEquals(readData, expectedData);
   }
+
+  /** Test positioned read with ByteBuffers. */
+  @Test(timeout=120000)
+  public void testPositionedReadWithByteBuffer() throws Exception {
+    try (OutputStream out = getOutputStream(defaultBufferSize)) {
+      writeData(out);
+    }
+
+    try (InputStream in = getInputStream(defaultBufferSize)) {
+      // Pos: 1/3 dataLen
+      positionedReadCheckWithByteBuffer(in, dataLen / 3);
+
+      // Pos: 1/2 dataLen
+      positionedReadCheckWithByteBuffer(in, dataLen / 2);
+    }
+  }
+
+  private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
+          throws Exception {
+    ByteBuffer result = ByteBuffer.allocate(dataLen);
+    int n = readAll(in, pos, result);
+
+    Assert.assertEquals(dataLen, n + pos);
+    byte[] readData = new byte[n];
+    System.arraycopy(result.array(), 0, readData, 0, n);
+    byte[] expectedData = new byte[n];
+    System.arraycopy(data, pos, expectedData, 0, n);
+    Assert.assertArrayEquals(readData, expectedData);
+  }
   
   /** Test read fully */
   @Test(timeout=120000)
@@ -505,12 +577,40 @@ public abstract class CryptoStreamsTestBase {
     System.arraycopy(data, 0, expectedData, 0, n);
     Assert.assertArrayEquals(readData, expectedData);
   }
+
+  private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
+      int bufPos) throws Exception {
+    // Test reading from position 0
+    buf.position(bufPos);
+    int n = ((ByteBufferPositionedReadable) in).read(0, buf);
+    Assert.assertEquals(bufPos + n, buf.position());
+    byte[] readData = new byte[n];
+    buf.rewind();
+    buf.position(bufPos);
+    buf.get(readData);
+    byte[] expectedData = new byte[n];
+    System.arraycopy(data, 0, expectedData, 0, n);
+    Assert.assertArrayEquals(readData, expectedData);
+
+    // Test reading from half way through the data
+    buf.position(bufPos);
+    n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
+    Assert.assertEquals(bufPos + n, buf.position());
+    readData = new byte[n];
+    buf.rewind();
+    buf.position(bufPos);
+    buf.get(readData);
+    expectedData = new byte[n];
+    System.arraycopy(data, dataLen / 2, expectedData, 0, n);
+    Assert.assertArrayEquals(readData, expectedData);
+  }
   
   /** Test byte buffer read with different buffer size. */
   @Test(timeout=120000)
   public void testByteBufferRead() throws Exception {
-    OutputStream out = getOutputStream(defaultBufferSize);
-    writeData(out);
+    try (OutputStream out = getOutputStream(defaultBufferSize)) {
+      writeData(out);
+    }
     
     // Default buffer size, initial buffer position is 0
     InputStream in = getInputStream(defaultBufferSize);
@@ -560,6 +660,53 @@ public abstract class CryptoStreamsTestBase {
     byteBufferReadCheck(in, buf, 11);
     in.close();
   }
+
+  /** Test byte buffer pread with different buffer size. */
+  @Test(timeout=120000)
+  public void testByteBufferPread() throws Exception {
+    try (OutputStream out = getOutputStream(defaultBufferSize)) {
+      writeData(out);
+    }
+
+    try (InputStream defaultBuf = getInputStream(defaultBufferSize);
+         InputStream smallBuf = getInputStream(smallBufferSize)) {
+
+      ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
+
+      // Default buffer size, initial buffer position is 0
+      byteBufferPreadCheck(defaultBuf, buf, 0);
+
+      // Default buffer size, initial buffer position is not 0
+      buf.clear();
+      byteBufferPreadCheck(defaultBuf, buf, 11);
+
+      // Small buffer size, initial buffer position is 0
+      buf.clear();
+      byteBufferPreadCheck(smallBuf, buf, 0);
+
+      // Small buffer size, initial buffer position is not 0
+      buf.clear();
+      byteBufferPreadCheck(smallBuf, buf, 11);
+
+      // Test with direct ByteBuffer
+      buf = ByteBuffer.allocateDirect(dataLen + 100);
+
+      // Direct buffer, default buffer size, initial buffer position is 0
+      byteBufferPreadCheck(defaultBuf, buf, 0);
+
+      // Direct buffer, default buffer size, initial buffer position is not 0
+      buf.clear();
+      byteBufferPreadCheck(defaultBuf, buf, 11);
+
+      // Direct buffer, small buffer size, initial buffer position is 0
+      buf.clear();
+      byteBufferPreadCheck(smallBuf, buf, 0);
+
+      // Direct buffer, small buffer size, initial buffer position is not 0
+      buf.clear();
+      byteBufferPreadCheck(smallBuf, buf, 11);
+    }
+  }
   
   @Test(timeout=120000)
   public void testCombinedOp() throws Exception {
@@ -782,15 +929,15 @@ public abstract class CryptoStreamsTestBase {
 
     // Test pread
     try (InputStream in = getInputStream(smallBufferSize)) {
-      if (in instanceof PositionedReadable) {
-        PositionedReadable pin = (PositionedReadable) in;
+      if (in instanceof ByteBufferPositionedReadable) {
+        ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;
 
         // Test unbuffer after pread
-        preadCheck(pin);
+        byteBufferPreadCheck(bbpin);
         ((CanUnbuffer) in).unbuffer();
 
         // Test pread again after unbuffer
-        preadCheck(pin);
+        byteBufferPreadCheck(bbpin);
 
         // Test close after unbuffer
         ((CanUnbuffer) in).unbuffer();
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
index 28bacc6..d524fcf 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
@@ -184,9 +185,9 @@ public class TestCryptoStreams extends 
CryptoStreamsTestBase {
   
   static class FakeInputStream extends InputStream
       implements Seekable, PositionedReadable, ByteBufferReadable,
-                 HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
-                 HasEnhancedByteBufferAccess, CanUnbuffer,
-                 StreamCapabilities {
+      ByteBufferPositionedReadable, HasFileDescriptor, CanSetDropBehind,
+      CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer,
+      StreamCapabilities {
     private final byte[] oneByteBuf = new byte[1];
     private int pos = 0;
     private final byte[] data;
@@ -310,6 +311,31 @@ public class TestCryptoStreams extends 
CryptoStreamsTestBase {
     }
 
     @Override
+    public int read(long position, ByteBuffer buf) throws IOException {
+      if (buf == null) {
+        throw new NullPointerException();
+      } else if (!buf.hasRemaining()) {
+        return 0;
+      }
+
+      if (position > length) {
+        throw new IOException("Cannot read after EOF.");
+      }
+      if (position < 0) {
+        throw new IOException("Cannot read to negative offset.");
+      }
+
+      checkStream();
+
+      if (position < length) {
+        int n = (int) Math.min(buf.remaining(), length - position);
+        buf.put(data, (int) position, n);
+        return n;
+      }
+      return -1;
+    }
+
+    @Override
     public void readFully(long position, byte[] b, int off, int len)
         throws IOException {
       if (b == null) {
@@ -384,6 +410,8 @@ public class TestCryptoStreams extends 
CryptoStreamsTestBase {
       case StreamCapabilities.READAHEAD:
       case StreamCapabilities.DROPBEHIND:
       case StreamCapabilities.UNBUFFER:
+      case StreamCapabilities.READBYTEBUFFER:
+      case StreamCapabilities.PREADBYTEBUFFER:
         return true;
       default:
         return false;
@@ -438,6 +466,8 @@ public class TestCryptoStreams extends 
CryptoStreamsTestBase {
     assertTrue(cis.hasCapability(StreamCapabilities.DROPBEHIND));
     assertTrue(cis.hasCapability(StreamCapabilities.READAHEAD));
     assertTrue(cis.hasCapability(StreamCapabilities.UNBUFFER));
+    assertTrue(cis.hasCapability(StreamCapabilities.READBYTEBUFFER));
+    assertTrue(cis.hasCapability(StreamCapabilities.PREADBYTEBUFFER));
     assertFalse(cis.hasCapability(StreamCapabilities.HFLUSH));
     assertFalse(cis.hasCapability(StreamCapabilities.HSYNC));
   }
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
index bb3fd7a..e7d922e 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
@@ -90,11 +90,21 @@ public class TestCryptoStreamsForLocalFS extends 
CryptoStreamsTestBase {
   @Override
   @Test(timeout=10000)
   public void testByteBufferRead() throws Exception {}
+
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testPositionedReadWithByteBuffer() throws IOException {}
   
   @Ignore("ChecksumFSOutputSummer doesn't support Syncable")
   @Override
   @Test(timeout=10000)
   public void testSyncable() throws IOException {}
+
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testByteBufferPread() throws IOException {}
   
   @Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
   @Override
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
index 7e30077..036706f 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
@@ -91,6 +91,11 @@ public class TestCryptoStreamsNormal extends 
CryptoStreamsTestBase {
   @Test(timeout=10000)
   public void testPositionedRead() throws IOException {}
 
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testPositionedReadWithByteBuffer() throws IOException {}
+
   @Ignore("Wrapped stream doesn't support ReadFully")
   @Override
   @Test(timeout=10000)
@@ -105,6 +110,11 @@ public class TestCryptoStreamsNormal extends 
CryptoStreamsTestBase {
   @Override
   @Test(timeout=10000)
   public void testByteBufferRead() throws IOException {}
+
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testByteBufferPread() throws IOException {}
   
   @Ignore("Wrapped stream doesn't support ByteBufferRead, Seek")
   @Override
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index e7b6849..51f21f5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ByteBufferUtil;
 import org.apache.hadoop.fs.CanSetDropBehind;
@@ -100,7 +101,8 @@ import javax.annotation.Nonnull;
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream
     implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
-               HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
+    HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
+    ByteBufferPositionedReadable {
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
@@ -1761,6 +1763,14 @@ public class DFSInputStream extends FSInputStream
     throw new IOException("Mark/reset not supported");
   }
 
+  @Override
+  public int read(long position, final ByteBuffer buf) throws IOException {
+    if (!buf.hasRemaining()) {
+      return 0;
+    }
+    return pread(position, buf);
+  }
+
   /** Utility class to encapsulate data node info and its address. */
   static final class DNAddrPair {
     final DatanodeInfo info;
@@ -1983,6 +1993,8 @@ public class DFSInputStream extends FSInputStream
     case StreamCapabilities.READAHEAD:
     case StreamCapabilities.DROPBEHIND:
     case StreamCapabilities.UNBUFFER:
+    case StreamCapabilities.READBYTEBUFFER:
+    case StreamCapabilities.PREADBYTEBUFFER:
       return true;
     default:
       return false;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
index 0eab9a6..f003263 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
@@ -50,6 +50,24 @@ extern  "C" {
     void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
 
     /**
+    * Determine if a file is using the "direct pread" optimization.
+    *
+    * @param file     The HDFS file
+    * @return         1 if the file is using the direct pread optimization,
+    *                 0 otherwise.
+    */
+    int hdfsFileUsesDirectPread(struct hdfsFile_internal *file);
+
+    /**
+     * Disable the direct pread optimization for a file.
+     *
+     * This is mainly provided for unit testing purposes.
+     *
+     * @param file     The HDFS file
+     */
+    void hdfsFileDisableDirectPread(struct hdfsFile_internal *file);
+
+    /**
      * Disable domain socket security checks.
      *
      * @param          0 if domain socket security was disabled;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
index d69aa37..2647563 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
@@ -75,9 +75,9 @@ int main(int argc, char **argv) {
     const char *userPath = "/tmp/usertestfile.txt";
 
     char buffer[32], buffer2[256], rdbuffer[32];
-    tSize num_written_bytes, num_read_bytes;
+    tSize num_written_bytes, num_read_bytes, num_pread_bytes;
     hdfsFS fs, lfs;
-    hdfsFile writeFile, readFile, localFile, appendFile, userFile;
+    hdfsFile writeFile, readFile, preadFile, localFile, appendFile, userFile;
     tOffset currentPos, seekPos;
     int exists, totalResult, result, numEntries, i, j;
     const char *resp;
@@ -206,6 +206,7 @@ int main(int argc, char **argv) {
         }
         fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
                 num_read_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
         if (hdfsSeek(fs, readFile, 0L)) {
             fprintf(stderr, "Failed to seek to file start!\n");
             exit(-1);
@@ -215,18 +216,17 @@ int main(int argc, char **argv) {
         // read path
         hdfsFileDisableDirectRead(readFile);
 
-        num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, 
+        num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
                 sizeof(buffer));
-        fprintf(stderr, "Read following %d bytes:\n%s\n", 
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to read. Expected %s but got %s (%d 
bytes)\n",
+                    fileContents, buffer, num_read_bytes);
+            exit(-1);
+        }
+        fprintf(stderr, "Read following %d bytes:\n%s\n",
                 num_read_bytes, buffer);
-
         memset(buffer, 0, strlen(fileContents + 1));
 
-        num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, 
-                sizeof(buffer));
-        fprintf(stderr, "Read following %d bytes:\n%s\n", 
-                num_read_bytes, buffer);
-
         hdfsCloseFile(fs, readFile);
 
         // Test correct behaviour for unsupported filesystems
@@ -251,6 +251,104 @@ int main(int argc, char **argv) {
         hdfsCloseFile(lfs, localFile);
     }
 
+    {
+        // Pread tests
+
+        exists = hdfsExists(fs, readPath);
+
+        if (exists) {
+            fprintf(stderr, "Failed to validate existence of %s\n", readPath);
+            exit(-1);
+        }
+
+        preadFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0);
+        if (!preadFile) {
+            fprintf(stderr, "Failed to open %s for reading!\n", readPath);
+            exit(-1);
+        }
+
+        if (!hdfsFileIsOpenForRead(preadFile)) {
+            fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file "
+                            "with O_RDONLY, and it did not show up as 'open 
for "
+                            "read'\n");
+            exit(-1);
+        }
+
+        fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, preadFile));
+
+        num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, 
sizeof(buffer));
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to pread (direct). Expected %s but got %s 
(%d bytes)\n",
+                    fileContents, buffer, num_read_bytes);
+            exit(-1);
+        }
+        fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n",
+                num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "Pread changed position of file\n");
+            exit(-1);
+        }
+
+        // Test pread midway through the file rather than at the beginning
+        const char *fileContentsChunk = "World!";
+        num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, 
sizeof(buffer));
+        if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 
0) {
+            fprintf(stderr, "Failed to pread (direct). Expected %s but got %s 
(%d bytes)\n",
+                    fileContentsChunk, buffer, num_read_bytes);
+            exit(-1);
+        }
+        fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", 
num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "Pread changed position of file\n");
+            exit(-1);
+        }
+
+        // Disable the direct pread path so that we really go through the slow
+        // read path
+        hdfsFileDisableDirectPread(preadFile);
+
+        num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, 
sizeof(buffer));
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to pread. Expected %s but got %s (%d 
bytes)\n",
+                    fileContents, buffer, num_pread_bytes);
+            exit(-1);
+        }
+        fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, 
buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "Pread changed position of file\n");
+            exit(-1);
+        }
+
+        num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, 
sizeof(buffer));
+        if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 
0) {
+            fprintf(stderr, "Failed to pread (direct). Expected %s but got %s 
(%d bytes)\n",
+                    fileContentsChunk, buffer, num_read_bytes);
+            exit(-1);
+        }
+        fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", 
num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "Pread changed position of file\n");
+            exit(-1);
+        }
+
+        hdfsCloseFile(fs, preadFile);
+
+        // Test correct behaviour for unsupported filesystems
+        localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0);
+
+        if (hdfsFileUsesDirectPread(localFile)) {
+            fprintf(stderr, "Direct pread support incorrectly detected for 
local "
+                            "filesystem\n");
+            exit(-1);
+        }
+
+        hdfsCloseFile(lfs, localFile);
+    }
+
     totalResult = 0;
     result = 0;
     {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
index ac4bca6..e3cd580 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
@@ -38,6 +38,7 @@
 #define HADOOP_OSTRM    "org/apache/hadoop/fs/FSDataOutputStream"
 #define HADOOP_STAT     "org/apache/hadoop/fs/FileStatus"
 #define HADOOP_FSPERM   "org/apache/hadoop/fs/permission/FsPermission"
+#define HADOOP_FS_DATA_INPUT_STREAM   "org/apache/hadoop/fs/FSDataInputStream"
 #define JAVA_NET_ISA    "java/net/InetSocketAddress"
 #define JAVA_NET_URI    "java/net/URI"
 #define JAVA_STRING     "java/lang/String"
@@ -56,8 +57,23 @@
 
 // Bit fields for hdfsFile_internal flags
 #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
+#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<0)
 
+/**
+ * Reads bytes using the read(ByteBuffer) API. By using Java
+ * DirectByteBuffers we can avoid copying the bytes from kernel space into
+ * user space.
+ */
 tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
+
+/**
+ * Reads bytes using the read(long, ByteBuffer) API. By using Java
+ * DirectByteBuffers we can avoid copying the bytes from kernel space into
+ * user space.
+ */
+tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
+                  tSize length);
+
 static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
 
 /**
@@ -235,6 +251,16 @@ void hdfsFileDisableDirectRead(hdfsFile file)
     file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
 }
 
+int hdfsFileUsesDirectPread(hdfsFile file)
+{
+    return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD);
+}
+
+void hdfsFileDisableDirectPread(hdfsFile file)
+{
+    file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_PREAD;
+}
+
 int hdfsDisableDomainSocketSecurity(void)
 {
     jthrowable jthr;
@@ -922,6 +948,62 @@ int hdfsStreamBuilderSetDefaultBlockSize(struct 
hdfsStreamBuilder *bld,
     return 0;
 }
 
+/**
+ * Delegates to FsDataInputStream#hasCapability(String). Used to check if a
+ * given input stream supports certain methods, such as
+ * ByteBufferReadable#read(ByteBuffer).
+ *
+ * @param jFile the FsDataInputStream to call hasCapability on
+ * @param capability the name of the capability to query; for a full list of
+ *        possible values see StreamCapabilities
+ *
+ * @return true if the given jFile has the given capability, false otherwise
+ *
+ * @see org.apache.hadoop.fs.StreamCapabilities
+ */
+static int hdfsHasStreamCapability(jobject jFile,
+        const char *capability) {
+    int ret = 0;
+    jthrowable jthr = NULL;
+    jvalue jVal;
+    jstring jCapabilityString = NULL;
+
+    /* Get the JNIEnv* corresponding to current thread */
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return 0;
+    }
+
+    jthr = newJavaStr(env, capability, &jCapabilityString);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsHasStreamCapability(%s): newJavaStr", capability);
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, jFile,
+            HADOOP_FS_DATA_INPUT_STREAM, "hasCapability", 
"(Ljava/lang/String;)Z",
+            jCapabilityString);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsHasStreamCapability(%s): FSDataInputStream#hasCapability",
+                capability);
+        goto done;
+    }
+
+done:
+    destroyLocalReference(env, jthr);
+    destroyLocalReference(env, jCapabilityString);
+    if (ret) {
+        errno = ret;
+        return 0;
+    }
+    if (jVal.z == JNI_TRUE) {
+        return 1;
+    }
+    return 0;
+}
+
 static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
                   int32_t bufferSize, int16_t replication, int64_t blockSize)
 {
@@ -932,7 +1014,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char 
*path, int flags,
        return f{is|os};
     */
     int accmode = flags & O_ACCMODE;
-    jstring jStrBufferSize = NULL, jStrReplication = NULL;
+    jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = 
NULL;
     jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
     jobject jFS = (jobject)fs;
     jthrowable jthr;
@@ -1090,16 +1172,16 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char 
*path, int flags,
     file->flags = 0;
 
     if ((flags & O_WRONLY) == 0) {
-        // Try a test read to see if we can do direct reads
-        char buf;
-        if (readDirect(fs, file, &buf, 0) == 0) {
-            // Success - 0-byte read should return 0
+        // Check the StreamCapabilities of jFile to see if we can do direct
+        // reads
+        if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) {
             file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
-        } else if (errno != ENOTSUP) {
-            // Unexpected error. Clear it, don't set the direct flag.
-            fprintf(stderr,
-                  "hdfsOpenFile(%s): WARN: Unexpected error %d when testing "
-                  "for direct read compatibility\n", path, errno);
+        }
+
+        // Check the StreamCapabilities of jFile to see if we can do direct
+        // preads
+        if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) {
+            file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
         }
     }
     ret = 0;
@@ -1109,7 +1191,8 @@ done:
     destroyLocalReference(env, jStrReplication);
     destroyLocalReference(env, jConfiguration); 
     destroyLocalReference(env, jPath); 
-    destroyLocalReference(env, jFile); 
+    destroyLocalReference(env, jFile);
+    destroyLocalReference(env, jCapabilityString);
     if (ret) {
         if (file) {
             if (file->file) {
@@ -1311,6 +1394,12 @@ static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile 
f,
     return 0;
 }
 
+/**
+ * If the underlying stream supports the ByteBufferReadable interface then
+ * this method will transparently use read(ByteBuffer). This can help
+ * improve performance as it avoids unnecessary copies between the kernel
+ * space, the Java process space, and the C process space.
+ */
 tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
 {
     jobject jInputStream;
@@ -1423,6 +1512,12 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, 
tSize length)
     return (jVal.i < 0) ? 0 : jVal.i;
 }
 
+/**
+ * If the underlying stream supports the ByteBufferPositionedReadable
+ * interface then this method will transparently use read(long, ByteBuffer).
+ * This can help improve performance as it avoids unnecessary copies between
+ * the kernel space, the Java process space, and the C process space.
+ */
 tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
                 void* buffer, tSize length)
 {
@@ -1442,6 +1537,10 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
         return -1;
     }
 
+    if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) {
+      return preadDirect(fs, f, position, buffer, length);
+    }
+
     env = getJNIEnv();
     if (env == NULL) {
       errno = EINTERNAL;
@@ -1491,6 +1590,60 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
     return jVal.i;
 }
 
+tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
+                  tSize length)
+{
+    // JAVA EQUIVALENT:
+    //  ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
+    //  fis.read(position, buf);
+
+    jvalue jVal;
+    jthrowable jthr;
+    jobject bb;
+
+    //Get the JNIEnv* corresponding to current thread
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+      errno = EINTERNAL;
+      return -1;
+    }
+
+    //Error checking... make sure that this file is 'readable'
+    if (f->type != HDFS_STREAM_INPUT) {
+        fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+        errno = EINVAL;
+        return -1;
+    }
+
+    //Read the requisite bytes
+    bb = (*env)->NewDirectByteBuffer(env, buffer, length);
+    if (bb == NULL) {
+        errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+            "readDirect: NewDirectByteBuffer");
+        return -1;
+    }
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
+            HADOOP_FS_DATA_INPUT_STREAM, "read", "(JLjava/nio/ByteBuffer;)I",
+            position, bb);
+    destroyLocalReference(env, bb);
+    if (jthr) {
+       errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+           "preadDirect: FSDataInputStream#read");
+       return -1;
+    }
+    // Reached EOF, return 0
+    if (jVal.i < 0) {
+        return 0;
+    }
+    // 0 bytes read, return error
+    if (jVal.i == 0) {
+        errno = EINTR;
+        return -1;
+    }
+    return jVal.i;
+}
+
 tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
 {
     // JAVA EQUIVALENT
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
new file mode 100644
index 0000000..4547db1
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the DFS positional read functionality on a single node
+ * mini-cluster. These tests are inspired from {@link TestPread}. The tests
+ * are much less comprehensive than other pread tests because pread already
+ * internally uses {@link ByteBuffer}s.
+ */
+public class TestByteBufferPread {
+
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+  private static byte[] fileContents;
+  private static Path testFile;
+  private static Random rand;
+
+  private static final long SEED = 0xDEADBEEFL;
+  private static final int BLOCK_SIZE = 4096;
+  private static final int FILE_SIZE = 12 * BLOCK_SIZE;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    // Setup the cluster with a small block size so we can create small files
+    // that span multiple blocks
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    fs = cluster.getFileSystem();
+
+    // Create a test file that spans 12 blocks, and contains a bunch of random
+    // bytes
+    fileContents = new byte[FILE_SIZE];
+    rand = new Random(SEED);
+    rand.nextBytes(fileContents);
+    testFile = new Path("/byte-buffer-pread-test.dat");
+    try (FSDataOutputStream out = fs.create(testFile, (short) 3)) {
+      out.write(fileContents);
+    }
+  }
+
+  /**
+   * Test preads with {@link java.nio.HeapByteBuffer}s.
+   */
+  @Test
+  public void testPreadWithHeapByteBuffer() throws IOException {
+    testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+  }
+
+  /**
+   * Test preads with {@link java.nio.DirectByteBuffer}s.
+   */
+  @Test
+  public void testPreadWithDirectByteBuffer() throws IOException {
+    testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+  }
+
+  /**
+   * Reads the entire testFile using the pread API and validates that its
+   * contents are properly loaded into the supplied {@link ByteBuffer}.
+   */
+  private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
+    int bytesRead;
+    int totalBytesRead = 0;
+    try (FSDataInputStream in = fs.open(testFile)) {
+      while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+        totalBytesRead += bytesRead;
+        // Check that each call to read changes the position of the ByteBuffer
+        // correctly
+        assertEquals(totalBytesRead, buffer.position());
+      }
+
+      // Make sure the buffer is full
+      assertFalse(buffer.hasRemaining());
+      // Make sure the contents of the read buffer equal the contents of the
+      // file
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents, fileContents);
+      buffer.position(buffer.limit());
+    }
+  }
+
+  /**
+   * Attempts to read the testFile into a {@link ByteBuffer} that is already
+   * full, and validates that doing so does not change the contents of the
+   * supplied {@link ByteBuffer}.
+   */
+  private void testPreadWithFullByteBuffer(ByteBuffer buffer)
+          throws IOException {
+    // Load some dummy data into the buffer
+    byte[] existingBufferBytes = new byte[FILE_SIZE];
+    rand.nextBytes(existingBufferBytes);
+    buffer.put(existingBufferBytes);
+    // Make sure the buffer is full
+    assertFalse(buffer.hasRemaining());
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      // Attempt to read into the buffer, 0 bytes should be read since the
+      // buffer is full
+      assertEquals(0, in.read(buffer));
+
+      // Double check the buffer is still full and its contents have not
+      // changed
+      assertFalse(buffer.hasRemaining());
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents, existingBufferBytes);
+    }
+  }
+
+  /**
+   * Reads half of the testFile into the {@link ByteBuffer} by setting a
+   * {@link ByteBuffer#limit} on the buffer. Validates that only half of the
+   * testFile is loaded into the buffer.
+   */
+  private void testPreadWithLimitedByteBuffer(
+          ByteBuffer buffer) throws IOException {
+    int bytesRead;
+    int totalBytesRead = 0;
+    // Set the buffer limit to half the size of the file
+    buffer.limit(FILE_SIZE / 2);
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+        totalBytesRead += bytesRead;
+        // Check that each call to read changes the position of the ByteBuffer
+        // correctly
+        assertEquals(totalBytesRead, buffer.position());
+      }
+
+      // Since we set the buffer limit to half the size of the file, we should
+      // have only read half of the file into the buffer
+      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      // Check that the buffer is full and the contents equal the first half of
+      // the file
+      assertFalse(buffer.hasRemaining());
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE / 2];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents,
+              Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+    }
+  }
+
+  /**
+   * Reads half of the testFile into the {@link ByteBuffer} by setting the
+   * {@link ByteBuffer#position} the half the size of the file. Validates that
+   * only half of the testFile is loaded into the buffer.
+   */
+  private void testPreadWithPositionedByteBuffer(
+          ByteBuffer buffer) throws IOException {
+    int bytesRead;
+    int totalBytesRead = 0;
+    // Set the buffer position to half the size of the file
+    buffer.position(FILE_SIZE / 2);
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+        totalBytesRead += bytesRead;
+        // Check that each call to read changes the position of the ByteBuffer
+        // correctly
+        assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position());
+      }
+
+      // Since we set the buffer position to half the size of the file, we
+      // should have only read half of the file into the buffer
+      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      // Check that the buffer is full and the contents equal the first half of
+      // the file
+      assertFalse(buffer.hasRemaining());
+      buffer.position(FILE_SIZE / 2);
+      byte[] bufferContents = new byte[FILE_SIZE / 2];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents,
+              Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+    }
+  }
+
+  /**
+   * Reads half of the testFile into the {@link ByteBuffer} by specifying a
+   * position for the pread API that is half of the file size. Validates that
+   * only half of the testFile is loaded into the buffer.
+   */
+  private void testPositionedPreadWithByteBuffer(
+          ByteBuffer buffer) throws IOException {
+    int bytesRead;
+    int totalBytesRead = 0;
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      // Start reading from halfway through the file
+      while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2,
+              buffer)) > 0) {
+        totalBytesRead += bytesRead;
+        // Check that each call to read changes the position of the ByteBuffer
+        // correctly
+        assertEquals(totalBytesRead, buffer.position());
+      }
+
+      // Since we starting reading halfway through the file, the buffer should
+      // only be half full
+      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      assertEquals(buffer.position(), FILE_SIZE / 2);
+      assertTrue(buffer.hasRemaining());
+      // Check that the buffer contents equal the second half of the file
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE / 2];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents,
+              Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE));
+    }
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    try {
+      fs.delete(testFile, false);
+      fs.close();
+    } finally {
+      cluster.shutdown(true);
+    }
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java
index 93486c0..d39b9be 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java
@@ -622,12 +622,11 @@ public class TestShortCircuitLocalRead {
     stm.write(fileData);
     stm.close();
     try {
-      checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, 
-          conf, shortCircuitFails);
-      //RemoteBlockReader have unsupported method read(ByteBuffer bf)
-      assertTrue(
-          "RemoteBlockReader unsupported method read(ByteBuffer bf) error",
-          checkUnsupportedMethod(fs, file1, fileData, readOffset));
+      checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, 
conf,
+          shortCircuitFails);
+      assertFalse(true);
+    } catch (UnsupportedOperationException unex) {
+      // RemoteBlockReader have unsupported method read(ByteBuffer bf)
     } catch(IOException e) {
       throw new IOException(
           "doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
@@ -639,18 +638,4 @@ public class TestShortCircuitLocalRead {
     }
   }
 
-  private boolean checkUnsupportedMethod(FileSystem fs, Path file,
-      byte[] expected, int readOffset) throws IOException {
-    HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
-    ByteBuffer actual =
-        ByteBuffer.allocateDirect(expected.length - readOffset);
-    IOUtils.skipFully(stm, readOffset);
-    try {
-      stm.read(actual);
-    } catch(UnsupportedOperationException unex) {
-      return true;
-    }
-    return false;
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to