This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4cb8c6982 [#291] feat(client): Record background fetch time for
prefetch (#2366)
4cb8c6982 is described below
commit 4cb8c6982f566fdfedd27f5e459c3aa5f4bbe269
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Feb 11 13:31:20 2025 +0800
[#291] feat(client): Record background fetch time for prefetch (#2366)
### What changes were proposed in this pull request?
Followup #2365 . Record the background fetch time to compare with the real
read time to get the overlapping time to know more about the prefetch
performance
### Why are the changes needed?
To know about the performance for online spark jobs
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
---------
Co-authored-by: Junfan Zhang <[email protected]>
---
.../handler/impl/PrefetchableClientReadHandler.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
index 0bfafde74..a8386ea4f 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ public abstract class PrefetchableClientReadHandler extends
AbstractClientReadHa
private AtomicBoolean abnormalFetchTag;
private AtomicBoolean finishedTag;
private AtomicInteger queueingNumber;
+ private AtomicLong fetchTime;
public PrefetchableClientReadHandler(Optional<PrefetchOption>
prefetchOptional) {
if (prefetchOptional.isPresent()) {
@@ -59,6 +61,7 @@ public abstract class PrefetchableClientReadHandler extends
AbstractClientReadHa
this.abnormalFetchTag = new AtomicBoolean(false);
this.finishedTag = new AtomicBoolean(false);
this.queueingNumber = new AtomicInteger(0);
+ this.fetchTime = new AtomicLong(0);
} else {
this.prefetchEnabled = false;
}
@@ -87,6 +90,7 @@ public abstract class PrefetchableClientReadHandler extends
AbstractClientReadHa
queueingNumber.incrementAndGet();
prefetchExecutors.submit(
() -> {
+ long start = System.currentTimeMillis();
try {
if (abnormalFetchTag.get() || finishedTag.get()) {
return;
@@ -101,6 +105,7 @@ public abstract class PrefetchableClientReadHandler extends
AbstractClientReadHa
LOG.error("Errors on doing readShuffleData", e);
} finally {
queueingNumber.decrementAndGet();
+ fetchTime.addAndGet(System.currentTimeMillis() - start);
}
});
}
@@ -138,4 +143,14 @@ public abstract class PrefetchableClientReadHandler
extends AbstractClientReadHa
prefetchExecutors.shutdown();
}
}
+
+ @Override
+ public void logConsumedBlockInfo() {
+ LOG.info(
+ "Metrics for shuffleId[{}], partitionId[{}], background fetch cost {}
ms",
+ shuffleId,
+ partitionId,
+ fetchTime);
+ super.logConsumedBlockInfo();
+ }
}