This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cb8c6982 [#291] feat(client): Record background fetch time for 
prefetch (#2366)
4cb8c6982 is described below

commit 4cb8c6982f566fdfedd27f5e459c3aa5f4bbe269
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Feb 11 13:31:20 2025 +0800

    [#291] feat(client): Record background fetch time for prefetch (#2366)
    
    ### What changes were proposed in this pull request?
    
    Followup #2365 . Record the background fetch time to compare with the real 
read time to get the overlapping time to know more about the prefetch 
performance
    
    ### Why are the changes needed?
    
    To know about the performance for online spark jobs
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
    
    ---------
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../handler/impl/PrefetchableClientReadHandler.java       | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
index 0bfafde74..a8386ea4f 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ public abstract class PrefetchableClientReadHandler extends 
AbstractClientReadHa
   private AtomicBoolean abnormalFetchTag;
   private AtomicBoolean finishedTag;
   private AtomicInteger queueingNumber;
+  private AtomicLong fetchTime;
 
   public PrefetchableClientReadHandler(Optional<PrefetchOption> 
prefetchOptional) {
     if (prefetchOptional.isPresent()) {
@@ -59,6 +61,7 @@ public abstract class PrefetchableClientReadHandler extends 
AbstractClientReadHa
       this.abnormalFetchTag = new AtomicBoolean(false);
       this.finishedTag = new AtomicBoolean(false);
       this.queueingNumber = new AtomicInteger(0);
+      this.fetchTime = new AtomicLong(0);
     } else {
       this.prefetchEnabled = false;
     }
@@ -87,6 +90,7 @@ public abstract class PrefetchableClientReadHandler extends 
AbstractClientReadHa
       queueingNumber.incrementAndGet();
       prefetchExecutors.submit(
           () -> {
+            long start = System.currentTimeMillis();
             try {
               if (abnormalFetchTag.get() || finishedTag.get()) {
                 return;
@@ -101,6 +105,7 @@ public abstract class PrefetchableClientReadHandler extends 
AbstractClientReadHa
               LOG.error("Errors on doing readShuffleData", e);
             } finally {
               queueingNumber.decrementAndGet();
+              fetchTime.addAndGet(System.currentTimeMillis() - start);
             }
           });
     }
@@ -138,4 +143,14 @@ public abstract class PrefetchableClientReadHandler 
extends AbstractClientReadHa
       prefetchExecutors.shutdown();
     }
   }
+
+  @Override
+  public void logConsumedBlockInfo() {
+    LOG.info(
+        "Metrics for shuffleId[{}], partitionId[{}], background fetch cost {} 
ms",
+        shuffleId,
+        partitionId,
+        fetchTime);
+    super.logConsumedBlockInfo();
+  }
 }

Reply via email to