steveloughran commented on a change in pull request #2646:
URL: https://github.com/apache/hadoop/pull/2646#discussion_r570483886



##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPread.java
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+public class ITestAbfsPread extends AbstractAbfsIntegrationTest {
+
+  public ITestAbfsPread() throws Exception {
+  }
+
+  @Test
+  public void testPread() throws IOException {
+    describe("Testing preads in AbfsInputStream");
+    Path dest = path("ITestAbfsPread");
+
+    int dataSize = 100;
+    byte[] data = ContractTestUtils.dataset(dataSize, 'a', 26);
+    ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
+        dataSize, true);
+    int bytesToRead = 10;
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      assertTrue(
+          "unexpected stream type "
+              + inputStream.getWrappedStream().getClass().getSimpleName(),
+          inputStream.getWrappedStream() instanceof AbfsInputStream);
+      byte[] readBuffer = new byte[bytesToRead];
+      int pos = 0;
+      assertEquals(
+          "AbfsInputStream#read did not read the correct number of bytes",
+          bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead));
+      assertTrue("AbfsInputStream#read did not read the correct bytes",
+          Arrays.equals(Arrays.copyOfRange(data, pos, pos + bytesToRead),

Review comment:
       I Think assertJ does even more...

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -135,6 +145,41 @@ public String getPath() {
     return path;
   }
 
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    // When bufferedPreadDisabled = true, this API do not use any shared 
buffer,

Review comment:
       nit: does

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -634,12 +638,15 @@ public AbfsInputStream openFileForRead(final Path path, 
final FileSystem.Statist
       // Add statistics for InputStream
       return new AbfsInputStream(client, statistics,
               relativePath, contentLength,
-              populateAbfsInputStreamContext(),
+              populateAbfsInputStreamContext(options),
               eTag);
     }
   }
 
-  private AbfsInputStreamContext populateAbfsInputStreamContext() {
+  private AbfsInputStreamContext populateAbfsInputStreamContext(
+      Optional<Configuration> options) {
+    boolean bufferedPreadDisabled = options.isPresent()

Review comment:
       some getOrElse would be easier

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
##########
@@ -399,6 +406,89 @@ public void testActionHttpGetRequest() throws IOException {
     }
   }
 
+  @Test
+  public void testPread() throws IOException {
+    describe("Testing preads in AbfsInputStream");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+    Path readStatPath = path(getMethodName());
+
+    AbfsOutputStream out = null;
+    AbfsInputStream in = null;
+    try {
+      out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath);
+      /*
+       * Writing 1MB buffer to the file.
+       */
+      out.write(defBuffer);
+      out.hflush();
+
+      in = abfss.openFileForRead(readStatPath, Optional.empty(),

Review comment:
       if you open this before L424, you'd also verify that openFile itself 
isn't doing a buffered read on open, wouldn't you?

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPread.java
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+public class ITestAbfsPread extends AbstractAbfsIntegrationTest {
+
+  public ITestAbfsPread() throws Exception {
+  }
+
+  @Test
+  public void testPread() throws IOException {
+    describe("Testing preads in AbfsInputStream");
+    Path dest = path("ITestAbfsPread");
+
+    int dataSize = 100;
+    byte[] data = ContractTestUtils.dataset(dataSize, 'a', 26);
+    ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
+        dataSize, true);
+    int bytesToRead = 10;
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      assertTrue(
+          "unexpected stream type "
+              + inputStream.getWrappedStream().getClass().getSimpleName(),
+          inputStream.getWrappedStream() instanceof AbfsInputStream);
+      byte[] readBuffer = new byte[bytesToRead];
+      int pos = 0;
+      assertEquals(
+          "AbfsInputStream#read did not read the correct number of bytes",
+          bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead));
+      assertTrue("AbfsInputStream#read did not read the correct bytes",
+          Arrays.equals(Arrays.copyOfRange(data, pos, pos + bytesToRead),
+              readBuffer));
+      // Read only 10 bytes from offset 0. But by default it will do the seek
+      // and read where the
+      // entire 100 bytes get read into the AbfsInputStream buffer.
+      assertArrayEquals(
+          "AbfsInputStream#read did not read more data into its buffer", data,
+          Arrays.copyOfRange(
+              ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 
0,
+              dataSize));
+    }
+    FutureDataInputStreamBuilder builder = getFileSystem().openFile(dest);
+    builder.opt(ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE, true);
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream = builder.build().get();
+    } catch (IllegalArgumentException | UnsupportedOperationException
+        | InterruptedException | ExecutionException e) {
+      throw new IOException(e);
+    }
+    assertNotNull(inputStream);
+    try {
+      AbfsInputStream abfsIs = (AbfsInputStream) 
inputStream.getWrappedStream();
+      byte[] readBuffer = new byte[bytesToRead];
+      int pos = 10;
+      assertEquals(
+          "AbfsInputStream#read did not read the correct number of bytes",
+          bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead));
+      assertTrue("AbfsInputStream#read did not read the correct bytes",
+          Arrays.equals(Arrays.copyOfRange(data, pos, pos + bytesToRead),
+              readBuffer));
+      // Read only 10 bytes from offset 10. This time, as buffered pread is
+      // disabled, it will only read the exact bytes as requested and no data
+      // will get read into the AbfsInputStream#buffer. Infact the buffer won't
+      // even get initialized.
+      assertNull("AbfsInputStream pread caused the internal buffer creation",
+          abfsIs.getBuffer());
+      // Now make a seek and read so that internal buffer gets created
+      inputStream.seek(0);
+      inputStream.read(readBuffer);
+      // This read would have fetched all 100 bytes into internal buffer.
+      assertArrayEquals(
+          "AbfsInputStream#read did not read more data into its buffer", data,
+          Arrays.copyOfRange(
+              ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 
0,
+              dataSize));
+      // Now again do pos read and make sure not any extra data being fetched.
+      resetBuffer(abfsIs.getBuffer());
+      pos = 0;
+      assertEquals(
+          "AbfsInputStream#read did not read the correct number of bytes",
+          bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead));
+      assertTrue("AbfsInputStream#read did not read the correct bytes",
+          Arrays.equals(Arrays.copyOfRange(data, pos, pos + bytesToRead),
+              readBuffer));
+      assertFalse(
+          "AbfsInputStream#read read more data into its buffer than expected",
+          Arrays.equals(data,
+              Arrays.copyOfRange(abfsIs.getBuffer(), 0, dataSize)));
+    } finally {
+      inputStream.close();
+    }
+  }
+
+  private void resetBuffer(byte[] buf) {
+    for (int i = 0; i < buf.length; i++) {
+      buf[i] = (byte) 0;
+    }
+  }
+}

Review comment:
       nit...newline at EOF

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPread.java
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+public class ITestAbfsPread extends AbstractAbfsIntegrationTest {
+
+  public ITestAbfsPread() throws Exception {
+  }
+
+  @Test
+  public void testPread() throws IOException {
+    describe("Testing preads in AbfsInputStream");
+    Path dest = path("ITestAbfsPread");
+
+    int dataSize = 100;
+    byte[] data = ContractTestUtils.dataset(dataSize, 'a', 26);
+    ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
+        dataSize, true);
+    int bytesToRead = 10;
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      assertTrue(
+          "unexpected stream type "
+              + inputStream.getWrappedStream().getClass().getSimpleName(),
+          inputStream.getWrappedStream() instanceof AbfsInputStream);
+      byte[] readBuffer = new byte[bytesToRead];
+      int pos = 0;
+      assertEquals(
+          "AbfsInputStream#read did not read the correct number of bytes",
+          bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead));
+      assertTrue("AbfsInputStream#read did not read the correct bytes",
+          Arrays.equals(Arrays.copyOfRange(data, pos, pos + bytesToRead),
+              readBuffer));
+      // Read only 10 bytes from offset 0. But by default it will do the seek
+      // and read where the
+      // entire 100 bytes get read into the AbfsInputStream buffer.
+      assertArrayEquals(
+          "AbfsInputStream#read did not read more data into its buffer", data,
+          Arrays.copyOfRange(
+              ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 
0,
+              dataSize));
+    }
+    FutureDataInputStreamBuilder builder = getFileSystem().openFile(dest);
+    builder.opt(ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE, true);
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream = builder.build().get();
+    } catch (IllegalArgumentException | UnsupportedOperationException
+        | InterruptedException | ExecutionException e) {
+      throw new IOException(e);
+    }
+    assertNotNull(inputStream);
+    try {
+      AbfsInputStream abfsIs = (AbfsInputStream) 
inputStream.getWrappedStream();
+      byte[] readBuffer = new byte[bytesToRead];
+      int pos = 10;
+      assertEquals(
+          "AbfsInputStream#read did not read the correct number of bytes",
+          bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead));
+      assertTrue("AbfsInputStream#read did not read the correct bytes",
+          Arrays.equals(Arrays.copyOfRange(data, pos, pos + bytesToRead),
+              readBuffer));
+      // Read only 10 bytes from offset 10. This time, as buffered pread is
+      // disabled, it will only read the exact bytes as requested and no data
+      // will get read into the AbfsInputStream#buffer. Infact the buffer won't
+      // even get initialized.
+      assertNull("AbfsInputStream pread caused the internal buffer creation",
+          abfsIs.getBuffer());
+      // Now make a seek and read so that internal buffer gets created
+      inputStream.seek(0);
+      inputStream.read(readBuffer);
+      // This read would have fetched all 100 bytes into internal buffer.
+      assertArrayEquals(
+          "AbfsInputStream#read did not read more data into its buffer", data,
+          Arrays.copyOfRange(
+              ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 
0,
+              dataSize));
+      // Now again do pos read and make sure not any extra data being fetched.
+      resetBuffer(abfsIs.getBuffer());

Review comment:
       need to understand this a bit. 
   Also, make sure that getPos() never changes. 
   
   Can AbfsStreamStatistics be used here to see what is going on? 
inputstream.getIIOStatistics() will get these all the way through 
FSDataInputStream

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -135,6 +145,41 @@ public String getPath() {
     return path;
   }
 
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    // When bufferedPreadDisabled = true, this API do not use any shared 
buffer,
+    // cursor position etc. So this is implemented as NOT synchronized. HBase
+    // kind of random reads on a shared file input stream will greatly get
+    // benefited by such implementation.
+    // Strict close check at the begin of the API only not for the entire flow.
+    synchronized (this) {
+      if (closed) {
+        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+      }
+    }
+    LOG.debug("pread requested offset = {} len = {} bufferedPreadDisabled = 
{}",
+        offset, length, bufferedPreadDisabled);
+    if (!bufferedPreadDisabled) {
+      return super.read(position, buffer, offset, length);
+    }
+    validatePositionedReadArgs(position, buffer, offset, length);
+    if (length == 0) {
+      return 0;
+    }
+    if (streamStatistics != null) {
+      streamStatistics.readOperationStarted();
+    }
+    int bytesRead = readRemote(position, buffer, offset, length);

Review comment:
       What about adding some DurationTracking of these operations? can be done 
by extending the ABFS stats to be a DurationTrackerFactory with a duration 
added to the builder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to