FMX commented on code in PR #3485:
URL: https://github.com/apache/celeborn/pull/3485#discussion_r2371274275


##########
client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java:
##########
@@ -307,6 +267,93 @@ public ByteBuf next() throws Exception {
     return chunk.getRight();
   }
 
+  private void startAsyncFetch() {
+    CompletableFuture.runAsync(this::fetchChunks, fetchExecutor)
+        .exceptionally(
+            throwable -> {
+              logger.warn("Async fetch failed", throwable);
+              exception.set(new IOException(throwable));
+              return null;
+            });
+  }
+
+  private void fetchChunks() {
+    try {
+      while (!closed && currentChunkIndex.get() <= endChunkIndex) {
+        if (partitionReaderCheckpointMetadata.isPresent()
+            && 
partitionReaderCheckpointMetadata.get().isCheckpointed(currentChunkIndex.get()))
 {
+          logger.info(
+              "Skipping chunk {} as it has already been returned,"
+                  + " likely by a previous reader for the same partition.",
+              currentChunkIndex.get());
+          currentChunkIndex.incrementAndGet();
+          continue;
+        }
+
+        while (results.size() >= fetchMaxReqsInFlight) {
+          Thread.sleep(50);
+        }
+
+        final int chunkIndex = currentChunkIndex.getAndIncrement();
+
+        CompletableFuture.runAsync(
+            () -> {
+              try {
+                long offset = chunkOffsets.get(chunkIndex);
+                long length = chunkOffsets.get(chunkIndex + 1) - offset;
+                logger.debug(
+                    "Reading chunk {} at offset {} with length {}", 
chunkIndex, offset, length);
+
+                byte[] buffer = new byte[(int) length];
+                readChunkFromHdfs(offset, buffer);
+
+                results.put(Pair.of(chunkIndex, 
Unpooled.wrappedBuffer(buffer)));
+                logger.debug("Added chunk {} to results", chunkIndex);
+              } catch (Exception e) {
+                logger.error("Error reading chunk {}", chunkIndex, e);
+                exception.set(new IOException(e));
+              }
+            },
+            fetchExecutor);
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Fetch thread interrupted", e);
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      logger.error("Error in fetchChunks", e);
+      exception.set(new IOException(e));
+    }
+
+    logger.debug("Fetch {} is done.", location.getStorageInfo().getFilePath());
+  }
+
+  private void readChunkFromHdfs(long offset, byte[] buffer) throws 
IOException {
+    try {

Review Comment:
   DfsInputStream is not generally thread safe, you'll need create a  
inputstream for every thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to