Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
github-actions[bot] commented on PR #9402: URL: https://github.com/apache/ozone/pull/9402#issuecomment-4043009886 Thank you for your contribution. This PR is being closed due to inactivity. Please contact a maintainer if you would like to reopen it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
github-actions[bot] closed pull request #9402: HDDS-13660. Ozone client support readVectored. URL: https://github.com/apache/ozone/pull/9402 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
github-actions[bot] commented on PR #9402: URL: https://github.com/apache/ozone/pull/9402#issuecomment-3994341889 This PR has been marked as stale due to 21 days of inactivity. Please comment or remove the stale label to keep it open. Otherwise, it will be automatically closed in 7 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
yandrey321 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2782869871
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##
@@ -261,6 +264,95 @@ public synchronized long skip(long n) throws IOException {
return toSkip;
}
+ /**
+ * Implements vectored read for multipart input stream.
+ * This method reads multiple byte ranges asynchronously, potentially
+ * from different underlying part streams.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ * @apiNote This method is synchronized to prevent race conditions from
+ * concurrent readVectored() calls on the same stream instance.
+ */
+ public synchronized void readVectored(
+ List ranges,
+ IntFunction allocate
+ ) throws IOException {
+checkOpen();
+if (!initialized) {
+ initialize();
+}
+
+// Save the initial position
+final long initialPosition = getPos();
+
+// Use common vectored read implementation
+VectoredReadUtils.performVectoredRead(
+ranges,
+allocate,
+(offset, buffer) -> readRangeData(offset, buffer, initialPosition)
+);
+
+// Restore position
+seek(initialPosition);
+ }
+
+ /**
+ * Helper method to read data for a specific range.
+ * Uses synchronized seeks to read data from the correct position.
+ * Reads data fully, handling partial reads in a loop.
+ *
+ * @param offset the starting offset in the stream
+ * @param buffer the buffer to read data into
+ * @param initialPosition the initial position to restore after reading
+ * @throws IOException if there is an error reading data
+ */
+ private void readRangeData(long offset, ByteBuffer buffer, long
initialPosition)
+ throws IOException {
+synchronized (this) {
Review Comment:
is there a reason for not allowing concurrent reads?
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##
@@ -261,6 +264,95 @@ public synchronized long skip(long n) throws IOException {
return toSkip;
}
+ /**
+ * Implements vectored read for multipart input stream.
+ * This method reads multiple byte ranges asynchronously, potentially
+ * from different underlying part streams.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ * @apiNote This method is synchronized to prevent race conditions from
+ * concurrent readVectored() calls on the same stream instance.
+ */
+ public synchronized void readVectored(
Review Comment:
does it need to be syncronized?
##
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##
@@ -165,7 +169,7 @@ public void unbuffer() {
* @throws IOException if there is some error performing the read
*/
@Override
- public int read(long position, ByteBuffer buf) throws IOException {
+ public synchronized int read(long position, ByteBuffer buf) throws
IOException {
Review Comment:
what is the reason for adding syncrhonized here?
##
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##
@@ -197,7 +201,7 @@ public int read(long position, ByteBuffer buf) throws
IOException {
* @throws EOFException if end of file reached before reading fully
*/
@Override
- public void readFully(long position, ByteBuffer buf) throws IOException {
+ public synchronized void readFully(long position, ByteBuffer buf) throws
IOException {
Review Comment:
what is the reason for adding syncrhonized here?
##
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##
@@ -207,4 +211,44 @@ public void readFully(long position, ByteBuffer buf)
throws IOException {
}
}
}
+
+ /**
+ * Implements vectored read by reading each range asynchronously.
+ * This allows clients to read multiple byte ranges from the same file
+ * in a single call, potentially improving performance by enabling
+ * parallel reads and reducing round-trip overhead.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ * @apiNote This method is synchronized to prevent race conditions from
+ * concurrent readVectored() calls on the same stream instance.
+ */
+ @Override
+ public synchronized void readVectored(List ranges,
Review Comment:
does it need to be synchronized?
--
This is an automated message from the Apache Git Service.
To respond t
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
adoroszlai commented on PR #9402: URL: https://github.com/apache/ozone/pull/9402#issuecomment-3871300305 @ashishkumar50 please take a look at @chungen0126's comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
github-actions[bot] commented on PR #9402: URL: https://github.com/apache/ozone/pull/9402#issuecomment-3857069332 This PR has been marked as stale due to 21 days of inactivity. Please comment or remove the stale label to keep it open. Otherwise, it will be automatically closed in 7 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
chungen0126 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2694657039
##
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##
@@ -207,4 +211,44 @@ public void readFully(long position, ByteBuffer buf)
throws IOException {
}
}
}
+
+ /**
+ * Implements vectored read by reading each range asynchronously.
+ * This allows clients to read multiple byte ranges from the same file
+ * in a single call, potentially improving performance by enabling
+ * parallel reads and reducing round-trip overhead.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ * @apiNote This method is synchronized to prevent race conditions from
+ * concurrent readVectored() calls on the same stream instance.
+ */
+ @Override
+ public synchronized void readVectored(List ranges,
+ IntFunction allocate) throws
IOException {
+TracingUtil.executeInNewSpan("OzoneFSInputStream.readVectored", () -> {
+ // Save the initial position
+ final long initialPosition = getPos();
+
+ // Use common vectored read implementation
+ VectoredReadUtils.performVectoredRead(
+ ranges,
+ allocate,
+ (offset, buffer) -> {
+// readFully is synchronized and uses positioned reads
+// which automatically preserve stream position
+readFully(offset, buffer);
+if (statistics != null) {
+ statistics.incrementBytesRead(buffer.remaining());
+}
+ }
+ );
+
+ // Restore position before returning from method
+ seek(initialPosition);
Review Comment:
Same comment as above.
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##
@@ -261,6 +264,95 @@ public synchronized long skip(long n) throws IOException {
return toSkip;
}
+ /**
+ * Implements vectored read for multipart input stream.
+ * This method reads multiple byte ranges asynchronously, potentially
+ * from different underlying part streams.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ * @apiNote This method is synchronized to prevent race conditions from
+ * concurrent readVectored() calls on the same stream instance.
+ */
+ public synchronized void readVectored(
+ List ranges,
+ IntFunction allocate
+ ) throws IOException {
+checkOpen();
+if (!initialized) {
+ initialize();
+}
+
+// Save the initial position
+final long initialPosition = getPos();
+
+// Use common vectored read implementation
+VectoredReadUtils.performVectoredRead(
+ranges,
+allocate,
+(offset, buffer) -> readRangeData(offset, buffer, initialPosition)
+);
+
+// Restore position
+seek(initialPosition);
Review Comment:
I'm wondering if the seek to initialPosition is necessary here? Since the
offset changes asynchronously, restoring it at this point might not work
correctly. It seems like readRangeData already handles the position restoration
correctly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
adoroszlai commented on PR #9402: URL: https://github.com/apache/ozone/pull/9402#issuecomment-3665213689 @chungen0126 @jojochuang @yandrey321 please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
ashishkumar50 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2601628368
##
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##
@@ -215,42 +215,29 @@ public synchronized void readFully(long position,
ByteBuffer buf) throws IOExcep
* @throws IOException if there is an error performing the reads
*/
@Override
- public void readVectored(List ranges,
- IntFunction allocate) throws
IOException {
+ public void readVectored(
+ List ranges,
+ IntFunction allocate
+ ) throws IOException {
TracingUtil.executeInNewSpan("OzoneFSInputStream.readVectored", () -> {
- // Perform vectored read using positioned read operations
- for (FileRange range : ranges) {
-CompletableFuture result = range.getData();
-if (result == null) {
- result = new CompletableFuture<>();
- range.setData(result);
-}
-
-final CompletableFuture finalResult = result;
-final long offset = range.getOffset();
-final int length = range.getLength();
+ // Save the initial position
+ final long initialPosition = getPos();
-// Submit async read task for this range
-CompletableFuture.runAsync(() -> {
- try {
-ByteBuffer buffer = allocate.apply(length);
-int bytesRead = read(offset, buffer);
+ try {
+VectoredReadUtils.performVectoredRead(ranges, allocate, (offset,
buffer) -> {
+ int length = buffer.remaining();
+ readFully(offset, buffer);
-if (bytesRead < length) {
- finalResult.completeExceptionally(
- new EOFException("Requested " + length +
- " bytes but only read " + bytesRead));
-} else {
- buffer.flip();
- if (statistics != null) {
-statistics.incrementBytesRead(bytesRead);
- }
- finalResult.complete(buffer);
-}
- } catch (Exception e) {
-finalResult.completeExceptionally(e);
+ // Update statistics
+ if (statistics != null) {
+statistics.incrementBytesRead(length);
}
});
+ } finally {
+// Restore position
+synchronized (this) {
Review Comment:
Synchronized now to avoid race condition in same concurrent stream reads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
ashishkumar50 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2601627538
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##
@@ -245,36 +245,26 @@ public synchronized long skip(long n) throws IOException {
* @param allocate function to allocate ByteBuffer for each range
* @throws IOException if there is an error performing the reads
*/
- public void readVectored(List ranges,
- IntFunction allocate) throws
IOException {
+ public void readVectored(
+ List ranges,
+ IntFunction allocate
+ ) throws IOException {
checkOpen();
if (!initialized) {
initialize();
}
-// Perform vectored read using positioned read operations
-for (FileRange range : ranges) {
- CompletableFuture result = range.getData();
- if (result == null) {
-result = new CompletableFuture<>();
-range.setData(result);
- }
+// Save the initial position
+final long initialPosition = getPos();
- final CompletableFuture finalResult = result;
- final long offset = range.getOffset();
- final int rangeLength = range.getLength();
-
- // Submit async read task for this range
- CompletableFuture.runAsync(() -> {
-try {
- ByteBuffer buffer = allocate.apply(rangeLength);
- readRangeData(offset, buffer);
- buffer.flip();
- finalResult.complete(buffer);
-} catch (Exception e) {
- finalResult.completeExceptionally(e);
-}
- });
+try {
+ VectoredReadUtils.performVectoredRead(ranges, allocate,
+ (offset, buffer) -> readRangeData(offset, buffer, initialPosition));
+} finally {
+ // Restore position
+ synchronized (this) {
Review Comment:
Synchronized now to avoid race condition in same concurrent stream reads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
ashishkumar50 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2601623939
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##
@@ -286,21 +276,28 @@ public void readVectored(List ranges,
* @param buffer the buffer to read data into
* @throws IOException if there is an error reading data
*/
- private void readRangeData(long offset, ByteBuffer buffer) throws
IOException {
+ private void readRangeData(long offset, ByteBuffer buffer, long
initialPosition) throws IOException {
synchronized (this) {
- long savedPos = getPos();
try {
seek(offset);
-int remaining = buffer.remaining();
-byte[] temp = new byte[remaining];
-int bytesRead = read(temp, 0, remaining);
-if (bytesRead < remaining) {
- throw new EOFException("Could not read all requested bytes. " +
- "Requested: " + remaining + ", Read: " + bytesRead);
+int totalBytesToRead = buffer.remaining();
+byte[] temp = new byte[totalBytesToRead];
+int totalBytesRead = 0;
+
+// Read in a loop to handle partial reads
+while (totalBytesRead < totalBytesToRead) {
+ int bytesRead = read(temp, totalBytesRead, totalBytesToRead -
totalBytesRead);
+ if (bytesRead < 0) {
+throw new EOFException("End of file reached before reading fully.
" +
+"Requested: " + totalBytesToRead + ", Read: " +
totalBytesRead);
+ }
+ totalBytesRead += bytesRead;
}
-buffer.put(temp, 0, bytesRead);
+
+buffer.put(temp, 0, totalBytesRead);
Review Comment:
Avoided temp now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
ashishkumar50 commented on code in PR #9402: URL: https://github.com/apache/ozone/pull/9402#discussion_r2601623220 ## hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/VectoredReadUtils.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; + +/** + * Utility class for vectored read operations. + */ [email protected] +public final class VectoredReadUtils { + private VectoredReadUtils() { +// Utility class, no instances + } + + /** + * Functional interface for reading data into a buffer from a specific offset. + */ + @FunctionalInterface + public interface RangeReader { +/** + * Read data from the specified offset into the buffer. + * + * @param offset the offset to read from + * @param buffer the buffer to read into + * @throws IOException if an error occurs during reading + */ +void readRange(long offset, ByteBuffer buffer) throws IOException; + } + + /** + * Validates the ranges for vectored read operations. + * Checks for null ranges, negative offsets, negative lengths, and overlapping ranges. + * + * @param ranges list of file ranges to validate + * @throws NullPointerException if ranges list is null or contains null elements + * @throws EOFException if any range has a negative offset + * @throws IllegalArgumentException if any range has negative length or ranges overlap + */ + public static void validateRanges(List ranges) throws IOException { +if (ranges == null) { + throw new NullPointerException("Ranges list cannot be null"); +} +for (int i = 0; i < ranges.size(); i++) { + FileRange range = ranges.get(i); + if (range == null) { +throw new NullPointerException("Range at index " + i + " is null"); + } + // Check for negative offset + if (range.getOffset() < 0) { +throw new EOFException("Range " + i + " has negative offset: " + range.getOffset()); + } + // Check for negative length + if (range.getLength() < 0) { +throw new IllegalArgumentException("Range " + i + " has negative length: " + range.getLength()); + } +} +// Check for overlapping ranges +for (int i = 0; i < ranges.size(); i++) { + FileRange current = ranges.get(i); + long currentEnd = current.getOffset() + current.getLength(); + for (int j = i + 1; j < ranges.size(); j++) { +FileRange other = ranges.get(j); +long otherEnd = other.getOffset() + other.getLength(); +// Check if ranges overlap +boolean overlaps = (current.getOffset() < otherEnd && currentEnd > other.getOffset()); +if (overlaps) { + throw new IllegalArgumentException( + "Range[" + i + "] (" + current.getOffset() + ", " + current.getLength() + + ") overlaps with Range[" + j + "] (" + other.getOffset() + ", " + other.getLength() + ")"); +} + } +} + } + + /** + * Performs vectored read by reading each range asynchronously. + * This method handles the common logic of setting up CompletableFutures + * and submitting async read tasks for each range. + * + * @param ranges list of file ranges to read + * @param allocate function to allocate ByteBuffer for each range + * @param reader the function that performs the actual read operation + * @throws IOException if there is an error during validation + */ + public static void performVectoredRead( + List ranges, + IntFunction allocate, + RangeReader reader) throws IOException { + +// Validate ranges before processing +validateRanges(ranges); Review Comment: Done the sorting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
yandrey321 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2589598245
##
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##
@@ -215,42 +215,29 @@ public synchronized void readFully(long position,
ByteBuffer buf) throws IOExcep
* @throws IOException if there is an error performing the reads
*/
@Override
- public void readVectored(List ranges,
- IntFunction allocate) throws
IOException {
+ public void readVectored(
+ List ranges,
+ IntFunction allocate
+ ) throws IOException {
TracingUtil.executeInNewSpan("OzoneFSInputStream.readVectored", () -> {
- // Perform vectored read using positioned read operations
- for (FileRange range : ranges) {
-CompletableFuture result = range.getData();
-if (result == null) {
- result = new CompletableFuture<>();
- range.setData(result);
-}
-
-final CompletableFuture finalResult = result;
-final long offset = range.getOffset();
-final int length = range.getLength();
+ // Save the initial position
+ final long initialPosition = getPos();
-// Submit async read task for this range
-CompletableFuture.runAsync(() -> {
- try {
-ByteBuffer buffer = allocate.apply(length);
-int bytesRead = read(offset, buffer);
+ try {
+VectoredReadUtils.performVectoredRead(ranges, allocate, (offset,
buffer) -> {
+ int length = buffer.remaining();
+ readFully(offset, buffer);
-if (bytesRead < length) {
- finalResult.completeExceptionally(
- new EOFException("Requested " + length +
- " bytes but only read " + bytesRead));
-} else {
- buffer.flip();
- if (statistics != null) {
-statistics.incrementBytesRead(bytesRead);
- }
- finalResult.complete(buffer);
-}
- } catch (Exception e) {
-finalResult.completeExceptionally(e);
+ // Update statistics
+ if (statistics != null) {
+statistics.incrementBytesRead(length);
}
});
+ } finally {
+// Restore position
+synchronized (this) {
Review Comment:
what would happen in case of concurrent readVectored() calls?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
yandrey321 commented on code in PR #9402: URL: https://github.com/apache/ozone/pull/9402#discussion_r2589585760 ## hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/VectoredReadUtils.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; + +/** + * Utility class for vectored read operations. + */ [email protected] +public final class VectoredReadUtils { + private VectoredReadUtils() { +// Utility class, no instances + } + + /** + * Functional interface for reading data into a buffer from a specific offset. + */ + @FunctionalInterface + public interface RangeReader { +/** + * Read data from the specified offset into the buffer. + * + * @param offset the offset to read from + * @param buffer the buffer to read into + * @throws IOException if an error occurs during reading + */ +void readRange(long offset, ByteBuffer buffer) throws IOException; + } + + /** + * Validates the ranges for vectored read operations. + * Checks for null ranges, negative offsets, negative lengths, and overlapping ranges. + * + * @param ranges list of file ranges to validate + * @throws NullPointerException if ranges list is null or contains null elements + * @throws EOFException if any range has a negative offset + * @throws IllegalArgumentException if any range has negative length or ranges overlap + */ + public static void validateRanges(List ranges) throws IOException { +if (ranges == null) { + throw new NullPointerException("Ranges list cannot be null"); +} +for (int i = 0; i < ranges.size(); i++) { + FileRange range = ranges.get(i); + if (range == null) { +throw new NullPointerException("Range at index " + i + " is null"); + } + // Check for negative offset + if (range.getOffset() < 0) { +throw new EOFException("Range " + i + " has negative offset: " + range.getOffset()); + } + // Check for negative length + if (range.getLength() < 0) { +throw new IllegalArgumentException("Range " + i + " has negative length: " + range.getLength()); + } +} +// Check for overlapping ranges +for (int i = 0; i < ranges.size(); i++) { + FileRange current = ranges.get(i); + long currentEnd = current.getOffset() + current.getLength(); + for (int j = i + 1; j < ranges.size(); j++) { +FileRange other = ranges.get(j); +long otherEnd = other.getOffset() + other.getLength(); +// Check if ranges overlap +boolean overlaps = (current.getOffset() < otherEnd && currentEnd > other.getOffset()); +if (overlaps) { + throw new IllegalArgumentException( + "Range[" + i + "] (" + current.getOffset() + ", " + current.getLength() + + ") overlaps with Range[" + j + "] (" + other.getOffset() + ", " + other.getLength() + ")"); +} + } +} + } + + /** + * Performs vectored read by reading each range asynchronously. + * This method handles the common logic of setting up CompletableFutures + * and submitting async read tasks for each range. + * + * @param ranges list of file ranges to read + * @param allocate function to allocate ByteBuffer for each range + * @param reader the function that performs the actual read operation + * @throws IOException if there is an error during validation + */ + public static void performVectoredRead( + List ranges, + IntFunction allocate, + RangeReader reader) throws IOException { + +// Validate ranges before processing +validateRanges(ranges); Review Comment: should we sort Ranges to avoid seek() back? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
yandrey321 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2589554429
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##
@@ -286,21 +276,28 @@ public void readVectored(List ranges,
* @param buffer the buffer to read data into
* @throws IOException if there is an error reading data
*/
- private void readRangeData(long offset, ByteBuffer buffer) throws
IOException {
+ private void readRangeData(long offset, ByteBuffer buffer, long
initialPosition) throws IOException {
synchronized (this) {
- long savedPos = getPos();
try {
seek(offset);
-int remaining = buffer.remaining();
-byte[] temp = new byte[remaining];
-int bytesRead = read(temp, 0, remaining);
-if (bytesRead < remaining) {
- throw new EOFException("Could not read all requested bytes. " +
- "Requested: " + remaining + ", Read: " + bytesRead);
+int totalBytesToRead = buffer.remaining();
+byte[] temp = new byte[totalBytesToRead];
+int totalBytesRead = 0;
+
+// Read in a loop to handle partial reads
+while (totalBytesRead < totalBytesToRead) {
+ int bytesRead = read(temp, totalBytesRead, totalBytesToRead -
totalBytesRead);
+ if (bytesRead < 0) {
+throw new EOFException("End of file reached before reading fully.
" +
+"Requested: " + totalBytesToRead + ", Read: " +
totalBytesRead);
+ }
+ totalBytesRead += bytesRead;
}
-buffer.put(temp, 0, bytesRead);
+
+buffer.put(temp, 0, totalBytesRead);
Review Comment:
can we directly read to buffer and avoid using temp?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
yandrey321 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2589544557
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##
@@ -245,36 +245,26 @@ public synchronized long skip(long n) throws IOException {
* @param allocate function to allocate ByteBuffer for each range
* @throws IOException if there is an error performing the reads
*/
- public void readVectored(List ranges,
- IntFunction allocate) throws
IOException {
+ public void readVectored(
+ List ranges,
+ IntFunction allocate
+ ) throws IOException {
checkOpen();
if (!initialized) {
initialize();
}
-// Perform vectored read using positioned read operations
-for (FileRange range : ranges) {
- CompletableFuture result = range.getData();
- if (result == null) {
-result = new CompletableFuture<>();
-range.setData(result);
- }
+// Save the initial position
+final long initialPosition = getPos();
- final CompletableFuture finalResult = result;
- final long offset = range.getOffset();
- final int rangeLength = range.getLength();
-
- // Submit async read task for this range
- CompletableFuture.runAsync(() -> {
-try {
- ByteBuffer buffer = allocate.apply(rangeLength);
- readRangeData(offset, buffer);
- buffer.flip();
- finalResult.complete(buffer);
-} catch (Exception e) {
- finalResult.completeExceptionally(e);
-}
- });
+try {
+ VectoredReadUtils.performVectoredRead(ranges, allocate,
+ (offset, buffer) -> readRangeData(offset, buffer, initialPosition));
+} finally {
+ // Restore position
+ synchronized (this) {
Review Comment:
what would happen in case of concurrent readVectored() calls?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
ashishkumar50 commented on PR #9402: URL: https://github.com/apache/ozone/pull/9402#issuecomment-3612811343 @adoroszlai Thanks for the review, handled comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
adoroszlai commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2587898276
##
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java:
##
@@ -121,4 +125,24 @@ public boolean seekToNewSource(long targetPos) throws
IOException {
"underlying inputStream");
}
}
+
+ /**
+ * Read data from multiple byte ranges asynchronously.
+ * This allows reading multiple discontiguous ranges from the same file
+ * efficiently with a single API call.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ */
+ public void readVectored(List ranges,
+ IntFunction allocate) throws
IOException {
Review Comment:
nit:
```suggestion
public void readVectored(
List ranges,
IntFunction allocate
) throws IOException {
```
##
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##
@@ -199,4 +203,56 @@ public void readFully(long position, ByteBuffer buf)
throws IOException {
}
}
}
+
+ /**
+ * Implements vectored read by reading each range asynchronously.
+ * This allows clients to read multiple byte ranges from the same file
+ * in a single call, potentially improving performance by enabling
+ * parallel reads and reducing round-trip overhead.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ */
+ @Override
+ public void readVectored(List ranges,
+ IntFunction allocate) throws
IOException {
Review Comment:
nit:
```suggestion
public void readVectored(
List ranges,
IntFunction allocate
) throws IOException {
```
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##
@@ -232,6 +236,75 @@ public synchronized long skip(long n) throws IOException {
return toSkip;
}
+ /**
+ * Implements vectored read for multipart input stream.
+ * This method reads multiple byte ranges asynchronously, potentially
+ * from different underlying part streams.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ */
+ public void readVectored(List ranges,
+ IntFunction allocate) throws
IOException {
Review Comment:
nit: Please do not format method signature like this. Whenever visibility /
return type / method name / other modifiers are changed, we would have to
reindent all parameters.
```suggestion
public void readVectored(
List ranges,
IntFunction allocate
) throws IOException {
```
##
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##
@@ -199,4 +203,56 @@ public void readFully(long position, ByteBuffer buf)
throws IOException {
}
}
}
+
+ /**
+ * Implements vectored read by reading each range asynchronously.
+ * This allows clients to read multiple byte ranges from the same file
+ * in a single call, potentially improving performance by enabling
+ * parallel reads and reducing round-trip overhead.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ */
+ @Override
+ public void readVectored(List ranges,
+ IntFunction allocate) throws
IOException {
+TracingUtil.executeInNewSpan("OzoneFSInputStream.readVectored", () -> {
+ // Perform vectored read using positioned read operations
+ for (FileRange range : ranges) {
+CompletableFuture result = range.getData();
+if (result == null) {
+ result = new CompletableFuture<>();
+ range.setData(result);
+}
+
+final CompletableFuture finalResult = result;
+final long offset = range.getOffset();
+final int length = range.getLength();
+
+// Submit async read task for this range
+CompletableFuture.runAsync(() -> {
+ try {
+ByteBuffer buffer = allocate.apply(length);
+int bytesRead = read(offset, buffer);
+
+if (bytesRead < length) {
+ finalResult.completeExceptionally(
+ new EOFException("Requested " + length +
+ " bytes but only read " + bytesRead));
Review Comment:
`read()` is not guaranteed to read all data to fill the buffer. For that,
use `readFully()`.
##
h
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
adoroszlai commented on PR #9402: URL: https://github.com/apache/ozone/pull/9402#issuecomment-3608447742 Thanks @ashishkumar50 for the patch. I have ported the corresponding contract test from Hadoop, which shows that some input validation is missing (negative length and offset, overlapping ranges, same ranges). Please feel free to pick it from https://github.com/adoroszlai/ozone/commit/ee11cf36ae4ded41c0627906af79023fef21ae75 and include in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-13660. Ozone client support readVectored. [ozone]
jojochuang commented on PR #9402: URL: https://github.com/apache/ozone/pull/9402#issuecomment-3597842505 @yandrey321 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
