[ 
https://issues.apache.org/jira/browse/HADOOP-19767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18050595#comment-18050595
 ] 

ASF GitHub Bot commented on HADOOP-19767:
-----------------------------------------

bhattmanish98 commented on code in PR #8153:
URL: https://github.com/apache/hadoop/pull/8153#discussion_r2671828821


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -21,6 +21,7 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;

Review Comment:
   instead of importing entire Options class, we can just import 
OpenFileOptions class and directly mention OpenFileOptions class below in 
comments.
   import org.apache.hadoop.fs.Options.OpenFileOptions;
   



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -946,12 +950,30 @@ public AbfsInputStream openFileForRead(Path path,
 
       perfInfo.registerSuccess(true);
 
-      // Add statistics for InputStream
-      return new AbfsInputStream(getClient(), statistics, relativePath,
-          contentLength, populateAbfsInputStreamContext(
-          parameters.map(OpenFileParameters::getOptions),
-          contextEncryptionAdapter),
-          eTag, tracingContext);
+      AbfsReadPolicy inputPolicy = 
AbfsReadPolicy.getAbfsReadPolicy(getAbfsConfiguration().getAbfsReadPolicy());

Review Comment:
   This method contains lot of lines already. Instead of defining this if else 
case here, it would be better to define a new method so that tomorrow if new 
read pattern is introduced, we can just update that method.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Locale;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+
+/**
+ * Enum for ABFS Input Policies.
+ * Each policy maps to a particular implementation of {@link AbfsInputStream}
+ */
+public enum AbfsReadPolicy {
+
+  SEQUENTIAL(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL),
+  RANDOM(FS_OPTION_OPENFILE_READ_POLICY_RANDOM),
+  ADAPTIVE(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+
+  private final String readPolicy;
+
+  AbfsReadPolicy(String readPolicy) {
+    this.readPolicy = readPolicy;
+  }
+
+  @Override
+  public String toString() {
+    return readPolicy;
+  }
+
+  /**
+   * Get the enum constant from the string name.
+   * @param name policy name as configured by user
+   * @return the corresponding AbsInputPolicy to be used
+   */
+  public static AbfsReadPolicy getAbfsReadPolicy(String name) {
+    String trimmed = name.trim().toLowerCase(Locale.ENGLISH);

Review Comment:
   This variable can be renamed to something else. 
   Like readPolicyStr



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.lang.Math.max;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not 
specify any input policy.
+ * It switches between sequential and random read optimizations based on the 
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+  public AbfsAdaptiveInputStream(
+      final AbfsClient client,
+      final FileSystem.Statistics statistics,
+      final String path,
+      final long contentLength,
+      final AbfsInputStreamContext abfsInputStreamContext,
+      final String eTag,
+      TracingContext tracingContext) {
+    super(client, statistics, path, contentLength,
+        abfsInputStreamContext, eTag, tracingContext);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    //If buffer is empty, then fill the buffer.

Review Comment:
   Same thing is present in AbfsRandomInputStream and AbfsPrefetchInputStream 
as well.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.lang.Math.max;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not 
specify any input policy.
+ * It switches between sequential and random read optimizations based on the 
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+  public AbfsAdaptiveInputStream(
+      final AbfsClient client,
+      final FileSystem.Statistics statistics,
+      final String path,
+      final long contentLength,
+      final AbfsInputStreamContext abfsInputStreamContext,
+      final String eTag,
+      TracingContext tracingContext) {
+    super(client, statistics, path, contentLength,
+        abfsInputStreamContext, eTag, tracingContext);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    //If buffer is empty, then fill the buffer.

Review Comment:
   Space missing between // and If.
   Same thing at multiple other places as well, please correct it at all other 
places as well.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -801,6 +802,10 @@ byte[] getBuffer() {
     return buffer;
   }
 
+  protected void setBuffer(byte[] buffer) {

Review Comment:
   Java doc missing



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not 
specify any input policy.
+ * It switches between sequential and random read optimizations based on the 
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+  /**
+   * Constructs AbfsAdaptiveInputStream
+   * @param client AbfsClient to be used for read operations
+   * @param statistics to recordinput stream statistics
+   * @param path file path
+   * @param contentLength file content length
+   * @param abfsInputStreamContext input stream context
+   * @param eTag file eTag
+   * @param tracingContext tracing context to trace the read operations
+   */
+  public AbfsAdaptiveInputStream(
+      final AbfsClient client,
+      final FileSystem.Statistics statistics,
+      final String path,
+      final long contentLength,
+      final AbfsInputStreamContext abfsInputStreamContext,
+      final String eTag,
+      TracingContext tracingContext) {
+    super(client, statistics, path, contentLength,
+        abfsInputStreamContext, eTag, tracingContext);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    //If buffer is empty, then fill the buffer.
+    if (getBCursor() == getLimit()) {
+      //If EOF, then return -1
+      if (getFCursor() >= getContentLength()) {
+        return -1;
+      }
+
+      long bytesRead = 0;
+      //reset buffer to initial state - i.e., throw away existing data

Review Comment:
   Same as above



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -215,6 +216,12 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = 
"fs.azure.readaheadqueue.depth";
   public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = 
"fs.azure.read.alwaysReadBufferSize";
   public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = 
"fs.azure.read.readahead.blocksize";
+  /**
+   * Provides hint for the read workload pattern.
+   * Possible Values Exposed in {@link Options.OpenFileOptions}

Review Comment:
   Should it be OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES?
   OpenFileOptions has other values as well.
   If so, we can do
   import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES;
   and mention FS_OPTION_OPENFILE_READ_POLICIES only



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not 
specify any input policy.
+ * It switches between sequential and random read optimizations based on the 
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+  /**
+   * Constructs AbfsAdaptiveInputStream
+   * @param client AbfsClient to be used for read operations
+   * @param statistics to recordinput stream statistics
+   * @param path file path
+   * @param contentLength file content length
+   * @param abfsInputStreamContext input stream context
+   * @param eTag file eTag
+   * @param tracingContext tracing context to trace the read operations
+   */
+  public AbfsAdaptiveInputStream(
+      final AbfsClient client,
+      final FileSystem.Statistics statistics,
+      final String path,
+      final long contentLength,
+      final AbfsInputStreamContext abfsInputStreamContext,
+      final String eTag,
+      TracingContext tracingContext) {
+    super(client, statistics, path, contentLength,
+        abfsInputStreamContext, eTag, tracingContext);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    //If buffer is empty, then fill the buffer.
+    if (getBCursor() == getLimit()) {
+      //If EOF, then return -1

Review Comment:
   Same as above





> ABFS: [Read] Introduce Abfs Input Policy for detecting read patterns
> --------------------------------------------------------------------
>
>                 Key: HADOOP-19767
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19767
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.2
>            Reporter: Anuj Modi
>            Assignee: Anuj Modi
>            Priority: Major
>              Labels: pull-request-available
>
> Since the onset of ABFS Driver, there has been a single implementation of 
> AbfsInputStream. Different kinds of workloads require different heuristics to 
> give the best performance for that type of workload. For example: 
>  # Sequential Read Workloads like DFSIO and DistCP gain performance 
> improvement from prefetched 
>  # Random Read Workloads on other hand do not need Prefetches and enabling 
> prefetches for them is an overhead and TPS heavy 
>  # Query Workloads involving Parquet/ORC files benefit from improvements like 
> Footer Read and Small Files Reads
> To accomodate this we need to determine the pattern and accordingly create 
> Input Streams implemented for that particular pattern.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to