bhattmanish98 commented on code in PR #8153:
URL: https://github.com/apache/hadoop/pull/8153#discussion_r2671828821
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
Review Comment:
instead of importing entire Options class, we can just import
OpenFileOptions class and directly mention OpenFileOptions class below in
comments.
import org.apache.hadoop.fs.Options.OpenFileOptions;
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -946,12 +950,30 @@ public AbfsInputStream openFileForRead(Path path,
perfInfo.registerSuccess(true);
- // Add statistics for InputStream
- return new AbfsInputStream(getClient(), statistics, relativePath,
- contentLength, populateAbfsInputStreamContext(
- parameters.map(OpenFileParameters::getOptions),
- contextEncryptionAdapter),
- eTag, tracingContext);
+ AbfsReadPolicy inputPolicy =
AbfsReadPolicy.getAbfsReadPolicy(getAbfsConfiguration().getAbfsReadPolicy());
Review Comment:
This method contains lot of lines already. Instead of defining this if else
case here, it would be better to define a new method so that tomorrow if new
read pattern is introduced, we can just update that method.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Locale;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+
+/**
+ * Enum for ABFS Input Policies.
+ * Each policy maps to a particular implementation of {@link AbfsInputStream}
+ */
+public enum AbfsReadPolicy {
+
+ SEQUENTIAL(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL),
+ RANDOM(FS_OPTION_OPENFILE_READ_POLICY_RANDOM),
+ ADAPTIVE(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+
+ private final String readPolicy;
+
+ AbfsReadPolicy(String readPolicy) {
+ this.readPolicy = readPolicy;
+ }
+
+ @Override
+ public String toString() {
+ return readPolicy;
+ }
+
+ /**
+ * Get the enum constant from the string name.
+ * @param name policy name as configured by user
+ * @return the corresponding AbsInputPolicy to be used
+ */
+ public static AbfsReadPolicy getAbfsReadPolicy(String name) {
+ String trimmed = name.trim().toLowerCase(Locale.ENGLISH);
Review Comment:
This variable can be renamed to something else.
Like readPolicyStr
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.lang.Math.max;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ //If buffer is empty, then fill the buffer.
Review Comment:
Same thing is present in AbfsRandomInputStream and AbfsPrefetchInputStream
as well.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.lang.Math.max;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ //If buffer is empty, then fill the buffer.
Review Comment:
Space missing between // and If.
Same thing at multiple other places as well, please correct it at all other
places as well.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -801,6 +802,10 @@ byte[] getBuffer() {
return buffer;
}
+ protected void setBuffer(byte[] buffer) {
Review Comment:
Java doc missing
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ /**
+ * Constructs AbfsAdaptiveInputStream
+ * @param client AbfsClient to be used for read operations
+ * @param statistics to recordinput stream statistics
+ * @param path file path
+ * @param contentLength file content length
+ * @param abfsInputStreamContext input stream context
+ * @param eTag file eTag
+ * @param tracingContext tracing context to trace the read operations
+ */
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ //If buffer is empty, then fill the buffer.
+ if (getBCursor() == getLimit()) {
+ //If EOF, then return -1
+ if (getFCursor() >= getContentLength()) {
+ return -1;
+ }
+
+ long bytesRead = 0;
+ //reset buffer to initial state - i.e., throw away existing data
Review Comment:
Same as above
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -215,6 +216,12 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH =
"fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE =
"fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE =
"fs.azure.read.readahead.blocksize";
+ /**
+ * Provides hint for the read workload pattern.
+ * Possible Values Exposed in {@link Options.OpenFileOptions}
Review Comment:
Should it be OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES?
OpenFileOptions has other values as well.
If so, we can do
import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES;
and mention FS_OPTION_OPENFILE_READ_POLICIES only
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ /**
+ * Constructs AbfsAdaptiveInputStream
+ * @param client AbfsClient to be used for read operations
+ * @param statistics to recordinput stream statistics
+ * @param path file path
+ * @param contentLength file content length
+ * @param abfsInputStreamContext input stream context
+ * @param eTag file eTag
+ * @param tracingContext tracing context to trace the read operations
+ */
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ //If buffer is empty, then fill the buffer.
+ if (getBCursor() == getLimit()) {
+ //If EOF, then return -1
Review Comment:
Same as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]