[
https://issues.apache.org/jira/browse/HADOOP-19767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18050595#comment-18050595
]
ASF GitHub Bot commented on HADOOP-19767:
-----------------------------------------
bhattmanish98 commented on code in PR #8153:
URL: https://github.com/apache/hadoop/pull/8153#discussion_r2671828821
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
Review Comment:
instead of importing entire Options class, we can just import
OpenFileOptions class and directly mention OpenFileOptions class below in
comments.
import org.apache.hadoop.fs.Options.OpenFileOptions;
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -946,12 +950,30 @@ public AbfsInputStream openFileForRead(Path path,
perfInfo.registerSuccess(true);
- // Add statistics for InputStream
- return new AbfsInputStream(getClient(), statistics, relativePath,
- contentLength, populateAbfsInputStreamContext(
- parameters.map(OpenFileParameters::getOptions),
- contextEncryptionAdapter),
- eTag, tracingContext);
+ AbfsReadPolicy inputPolicy =
AbfsReadPolicy.getAbfsReadPolicy(getAbfsConfiguration().getAbfsReadPolicy());
Review Comment:
This method contains lot of lines already. Instead of defining this if else
case here, it would be better to define a new method so that tomorrow if new
read pattern is introduced, we can just update that method.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Locale;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+
+/**
+ * Enum for ABFS Input Policies.
+ * Each policy maps to a particular implementation of {@link AbfsInputStream}
+ */
+public enum AbfsReadPolicy {
+
+ SEQUENTIAL(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL),
+ RANDOM(FS_OPTION_OPENFILE_READ_POLICY_RANDOM),
+ ADAPTIVE(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+
+ private final String readPolicy;
+
+ AbfsReadPolicy(String readPolicy) {
+ this.readPolicy = readPolicy;
+ }
+
+ @Override
+ public String toString() {
+ return readPolicy;
+ }
+
+ /**
+ * Get the enum constant from the string name.
+ * @param name policy name as configured by user
+ * @return the corresponding AbsInputPolicy to be used
+ */
+ public static AbfsReadPolicy getAbfsReadPolicy(String name) {
+ String trimmed = name.trim().toLowerCase(Locale.ENGLISH);
Review Comment:
This variable can be renamed to something else.
Like readPolicyStr
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.lang.Math.max;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ //If buffer is empty, then fill the buffer.
Review Comment:
Same thing is present in AbfsRandomInputStream and AbfsPrefetchInputStream
as well.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.lang.Math.max;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ //If buffer is empty, then fill the buffer.
Review Comment:
Space missing between // and If.
Same thing at multiple other places as well, please correct it at all other
places as well.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -801,6 +802,10 @@ byte[] getBuffer() {
return buffer;
}
+ protected void setBuffer(byte[] buffer) {
Review Comment:
Java doc missing
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ /**
+ * Constructs AbfsAdaptiveInputStream
+ * @param client AbfsClient to be used for read operations
+ * @param statistics to recordinput stream statistics
+ * @param path file path
+ * @param contentLength file content length
+ * @param abfsInputStreamContext input stream context
+ * @param eTag file eTag
+ * @param tracingContext tracing context to trace the read operations
+ */
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ //If buffer is empty, then fill the buffer.
+ if (getBCursor() == getLimit()) {
+ //If EOF, then return -1
+ if (getFCursor() >= getContentLength()) {
+ return -1;
+ }
+
+ long bytesRead = 0;
+ //reset buffer to initial state - i.e., throw away existing data
Review Comment:
Same as above
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -215,6 +216,12 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH =
"fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE =
"fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE =
"fs.azure.read.readahead.blocksize";
+ /**
+ * Provides hint for the read workload pattern.
+ * Possible Values Exposed in {@link Options.OpenFileOptions}
Review Comment:
Should it be OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES?
OpenFileOptions has other values as well.
If so, we can do
import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES;
and mention FS_OPTION_OPENFILE_READ_POLICIES only
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ /**
+ * Constructs AbfsAdaptiveInputStream
+ * @param client AbfsClient to be used for read operations
+ * @param statistics to recordinput stream statistics
+ * @param path file path
+ * @param contentLength file content length
+ * @param abfsInputStreamContext input stream context
+ * @param eTag file eTag
+ * @param tracingContext tracing context to trace the read operations
+ */
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ //If buffer is empty, then fill the buffer.
+ if (getBCursor() == getLimit()) {
+ //If EOF, then return -1
Review Comment:
Same as above
> ABFS: [Read] Introduce Abfs Input Policy for detecting read patterns
> --------------------------------------------------------------------
>
> Key: HADOOP-19767
> URL: https://issues.apache.org/jira/browse/HADOOP-19767
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.2
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
> Since the onset of ABFS Driver, there has been a single implementation of
> AbfsInputStream. Different kinds of workloads require different heuristics to
> give the best performance for that type of workload. For example:
> # Sequential Read Workloads like DFSIO and DistCP gain performance
> improvement from prefetched
> # Random Read Workloads on other hand do not need Prefetches and enabling
> prefetches for them is an overhead and TPS heavy
> # Query Workloads involving Parquet/ORC files benefit from improvements like
> Footer Read and Small Files Reads
> To accomodate this we need to determine the pattern and accordingly create
> Input Streams implemented for that particular pattern.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]