This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new d5c77ed [MINOR] Extend Fed byte transfer stat to include UDFs
d5c77ed is described below
commit d5c77ed8c881554d13068b19acd8adbf296be562
Author: ywcb00 <[email protected]>
AuthorDate: Mon Mar 21 21:04:53 2022 +0100
[MINOR] Extend Fed byte transfer stat to include UDFs
Closes #1569
---
.../federated/FederatedStatistics.java | 101 ++++++++++++---------
1 file changed, 57 insertions(+), 44 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
index 9a9cfa7..68057e1 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
@@ -61,16 +61,17 @@ public class FederatedStatistics {
// stats of the federated worker on the coordinator site
private static Set<Pair<String, Integer>> _fedWorkerAddresses = new
HashSet<>();
private static final LongAdder readCount = new LongAdder();
- private static final LongAdder putScalarCount = new LongAdder();
- private static final LongAdder putListCount = new LongAdder();
- private static final LongAdder putMatrixCount = new LongAdder();
- private static final LongAdder putFrameCount = new LongAdder();
- private static final LongAdder putMatCharCount = new LongAdder();
- private static final LongAdder putMatrixBytes = new LongAdder();
- private static final LongAdder putFrameBytes = new LongAdder();
+ private static final LongAdder putCount = new LongAdder();
private static final LongAdder getCount = new LongAdder();
private static final LongAdder executeInstructionCount = new
LongAdder();
private static final LongAdder executeUDFCount = new LongAdder();
+ private static final LongAdder transferredScalarCount = new LongAdder();
+ private static final LongAdder transferredListCount = new LongAdder();
+ private static final LongAdder transferredMatrixCount = new LongAdder();
+ private static final LongAdder transferredFrameCount = new LongAdder();
+ private static final LongAdder transferredMatCharCount = new
LongAdder();
+ private static final LongAdder transferredMatrixBytes = new LongAdder();
+ private static final LongAdder transferredFrameBytes = new LongAdder();
private static final LongAdder asyncPrefetchCount = new LongAdder();
// stats on the federated worker itself
@@ -88,20 +89,8 @@ public class FederatedStatistics {
readCount.increment();
break;
case PUT_VAR:
- if(data.get(0) instanceof MatrixBlock) {
- putMatrixCount.increment();
-
putMatrixBytes.add(((MatrixBlock)data.get(0)).getInMemorySize());
- }
- else if(data.get(0) instanceof FrameBlock) {
- putFrameCount.increment();
-
putFrameBytes.add(((FrameBlock)data.get(0)).getInMemorySize());
- }
- else if(data.get(0) instanceof ScalarObject)
- putScalarCount.increment();
- else if(data.get(0) instanceof ListObject)
- putListCount.increment();
- else if(data.get(0) instanceof
MatrixCharacteristics)
- putMatCharCount.increment();
+ putCount.increment();
+ incFedTransfer(data.get(0));
break;
case GET_VAR:
getCount.increment();
@@ -111,34 +100,58 @@ public class FederatedStatistics {
break;
case EXEC_UDF:
executeUDFCount.increment();
+ incFedTransfer(data);
break;
default:
break;
}
}
+ private static void incFedTransfer(List<Object> data) {
+ for(Object dataObj : data)
+ incFedTransfer(dataObj);
+ }
+
+ private static void incFedTransfer(Object dataObj) {
+ if(dataObj instanceof MatrixBlock) {
+ transferredMatrixCount.increment();
+
transferredMatrixBytes.add(((MatrixBlock)dataObj).getInMemorySize());
+ }
+ else if(dataObj instanceof FrameBlock) {
+ transferredFrameCount.increment();
+
transferredFrameBytes.add(((FrameBlock)dataObj).getInMemorySize());
+ }
+ else if(dataObj instanceof ScalarObject)
+ transferredScalarCount.increment();
+ else if(dataObj instanceof ListObject)
+ transferredListCount.increment();
+ else if(dataObj instanceof MatrixCharacteristics)
+ transferredMatCharCount.increment();
+ }
+
public static void incAsyncPrefetchCount(long c) {
asyncPrefetchCount.add(c);
}
- public static long getTotalPutCount() {
- return putScalarCount.longValue() + putListCount.longValue()
- + putMatrixCount.longValue() + putFrameCount.longValue()
- + putMatCharCount.longValue();
+ public static long getTotalFedTransferCount() {
+ return transferredScalarCount.longValue() +
transferredListCount.longValue()
+ + transferredMatrixCount.longValue() +
transferredFrameCount.longValue()
+ + transferredMatCharCount.longValue();
}
public static void reset() {
readCount.reset();
- putScalarCount.reset();
- putListCount.reset();
- putMatrixCount.reset();
- putFrameCount.reset();
- putMatCharCount.reset();
- putMatrixBytes.reset();
- putFrameBytes.reset();
+ putCount.reset();
getCount.reset();
executeInstructionCount.reset();
executeUDFCount.reset();
+ transferredScalarCount.reset();
+ transferredListCount.reset();
+ transferredMatrixCount.reset();
+ transferredFrameCount.reset();
+ transferredMatCharCount.reset();
+ transferredMatrixBytes.reset();
+ transferredFrameBytes.reset();
asyncPrefetchCount.reset();
fedLookupTableGetCount.reset();
fedLookupTableGetTime.reset();
@@ -154,22 +167,22 @@ public class FederatedStatistics {
StringBuilder sb = new StringBuilder();
sb.append("Federated I/O (Read, Put, Get):\t" +
readCount.longValue() + "/" +
- getTotalPutCount() + "/" +
+ putCount.longValue() + "/" +
getCount.longValue() + ".\n");
- if(getTotalPutCount() > 0)
- sb.append("Fed Put (Sca/Lis/Mat/Fra/MC):\t" +
- putScalarCount.longValue() + "/" +
- putListCount.longValue() + "/" +
- putMatrixCount.longValue() + "/" +
- putFrameCount.longValue() + "/" +
- putMatCharCount.longValue() + ".\n");
- if(putMatrixBytes.longValue() > 0 ||
putFrameBytes.longValue() > 0)
- sb.append("Fed Put Bytes (Mat/Frame):\t" +
- putMatrixBytes.longValue() + "/" +
- putFrameBytes.longValue() + "
Bytes.\n");
sb.append("Federated Execute (Inst, UDF):\t" +
executeInstructionCount.longValue() + "/" +
executeUDFCount.longValue() + ".\n");
+ if(getTotalFedTransferCount() > 0)
+ sb.append("Fed Put Count (Sc/Li/Ma/Fr/MC):\t" +
+ transferredScalarCount.longValue() +
"/" +
+ transferredListCount.longValue() + "/" +
+ transferredMatrixCount.longValue() +
"/" +
+ transferredFrameCount.longValue() + "/"
+
+ transferredMatCharCount.longValue() +
".\n");
+ if(transferredMatrixBytes.longValue() > 0 ||
transferredFrameBytes.longValue() > 0)
+ sb.append("Fed Put Bytes (Mat/Frame):\t" +
+ transferredMatrixBytes.longValue() +
"/" +
+ transferredFrameBytes.longValue() + "
Bytes.\n");
sb.append("Federated prefetch count:\t" +
asyncPrefetchCount.longValue() + ".\n");
return sb.toString();