This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new d5c77ed  [MINOR] Extend Fed byte transfer stat to include UDFs
d5c77ed is described below

commit d5c77ed8c881554d13068b19acd8adbf296be562
Author: ywcb00 <[email protected]>
AuthorDate: Mon Mar 21 21:04:53 2022 +0100

    [MINOR] Extend Fed byte transfer stat to include UDFs
    
    Closes #1569
---
 .../federated/FederatedStatistics.java             | 101 ++++++++++++---------
 1 file changed, 57 insertions(+), 44 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
index 9a9cfa7..68057e1 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
@@ -61,16 +61,17 @@ public class FederatedStatistics {
        // stats of the federated worker on the coordinator site
        private static Set<Pair<String, Integer>> _fedWorkerAddresses = new 
HashSet<>();
        private static final LongAdder readCount = new LongAdder();
-       private static final LongAdder putScalarCount = new LongAdder();
-       private static final LongAdder putListCount = new LongAdder();
-       private static final LongAdder putMatrixCount = new LongAdder();
-       private static final LongAdder putFrameCount = new LongAdder();
-       private static final LongAdder putMatCharCount = new LongAdder();
-       private static final LongAdder putMatrixBytes = new LongAdder();
-       private static final LongAdder putFrameBytes = new LongAdder();
+       private static final LongAdder putCount = new LongAdder();
        private static final LongAdder getCount = new LongAdder();
        private static final LongAdder executeInstructionCount = new 
LongAdder();
        private static final LongAdder executeUDFCount = new LongAdder();
+       private static final LongAdder transferredScalarCount = new LongAdder();
+       private static final LongAdder transferredListCount = new LongAdder();
+       private static final LongAdder transferredMatrixCount = new LongAdder();
+       private static final LongAdder transferredFrameCount = new LongAdder();
+       private static final LongAdder transferredMatCharCount = new 
LongAdder();
+       private static final LongAdder transferredMatrixBytes = new LongAdder();
+       private static final LongAdder transferredFrameBytes = new LongAdder();
        private static final LongAdder asyncPrefetchCount = new LongAdder();
 
        // stats on the federated worker itself
@@ -88,20 +89,8 @@ public class FederatedStatistics {
                                readCount.increment();
                                break;
                        case PUT_VAR:
-                               if(data.get(0) instanceof MatrixBlock) {
-                                       putMatrixCount.increment();
-                                       
putMatrixBytes.add(((MatrixBlock)data.get(0)).getInMemorySize());
-                               }
-                               else if(data.get(0) instanceof FrameBlock) {
-                                       putFrameCount.increment();
-                                       
putFrameBytes.add(((FrameBlock)data.get(0)).getInMemorySize());
-                               }
-                               else if(data.get(0) instanceof ScalarObject)
-                                       putScalarCount.increment();
-                               else if(data.get(0) instanceof ListObject)
-                                       putListCount.increment();
-                               else if(data.get(0) instanceof 
MatrixCharacteristics)
-                                       putMatCharCount.increment();
+                               putCount.increment();
+                               incFedTransfer(data.get(0));
                                break;
                        case GET_VAR:
                                getCount.increment();
@@ -111,34 +100,58 @@ public class FederatedStatistics {
                                break;
                        case EXEC_UDF:
                                executeUDFCount.increment();
+                               incFedTransfer(data);
                                break;
                        default:
                                break;
                }
        }
 
+       private static void incFedTransfer(List<Object> data) {
+               for(Object dataObj : data)
+                       incFedTransfer(dataObj);
+       }
+
+       private static void incFedTransfer(Object dataObj) {
+               if(dataObj instanceof MatrixBlock) {
+                       transferredMatrixCount.increment();
+                       
transferredMatrixBytes.add(((MatrixBlock)dataObj).getInMemorySize());
+               }
+               else if(dataObj instanceof FrameBlock) {
+                       transferredFrameCount.increment();
+                       
transferredFrameBytes.add(((FrameBlock)dataObj).getInMemorySize());
+               }
+               else if(dataObj instanceof ScalarObject)
+                       transferredScalarCount.increment();
+               else if(dataObj instanceof ListObject)
+                       transferredListCount.increment();
+               else if(dataObj instanceof MatrixCharacteristics)
+                       transferredMatCharCount.increment();
+       }
+
        public static void incAsyncPrefetchCount(long c) {
                asyncPrefetchCount.add(c);
        }
 
-       public static long getTotalPutCount() {
-               return putScalarCount.longValue() + putListCount.longValue()
-                       + putMatrixCount.longValue() + putFrameCount.longValue()
-                       + putMatCharCount.longValue();
+       public static long getTotalFedTransferCount() {
+               return transferredScalarCount.longValue() + 
transferredListCount.longValue()
+                       + transferredMatrixCount.longValue() + 
transferredFrameCount.longValue()
+                       + transferredMatCharCount.longValue();
        }
 
        public static void reset() {
                readCount.reset();
-               putScalarCount.reset();
-               putListCount.reset();
-               putMatrixCount.reset();
-               putFrameCount.reset();
-               putMatCharCount.reset();
-               putMatrixBytes.reset();
-               putFrameBytes.reset();
+               putCount.reset();
                getCount.reset();
                executeInstructionCount.reset();
                executeUDFCount.reset();
+               transferredScalarCount.reset();
+               transferredListCount.reset();
+               transferredMatrixCount.reset();
+               transferredFrameCount.reset();
+               transferredMatCharCount.reset();
+               transferredMatrixBytes.reset();
+               transferredFrameBytes.reset();
                asyncPrefetchCount.reset();
                fedLookupTableGetCount.reset();
                fedLookupTableGetTime.reset();
@@ -154,22 +167,22 @@ public class FederatedStatistics {
                        StringBuilder sb = new StringBuilder();
                        sb.append("Federated I/O (Read, Put, Get):\t" +
                                readCount.longValue() + "/" +
-                               getTotalPutCount() + "/" +
+                               putCount.longValue() + "/" +
                                getCount.longValue() + ".\n");
-                       if(getTotalPutCount() > 0)
-                               sb.append("Fed Put (Sca/Lis/Mat/Fra/MC):\t" +
-                                       putScalarCount.longValue() + "/" +
-                                       putListCount.longValue() + "/" +
-                                       putMatrixCount.longValue() + "/" +
-                                       putFrameCount.longValue() + "/" +
-                                       putMatCharCount.longValue() + ".\n");
-                       if(putMatrixBytes.longValue() > 0 || 
putFrameBytes.longValue() > 0)
-                               sb.append("Fed Put Bytes (Mat/Frame):\t" +
-                                       putMatrixBytes.longValue() + "/" +
-                                       putFrameBytes.longValue() + " 
Bytes.\n");
                        sb.append("Federated Execute (Inst, UDF):\t" +
                                executeInstructionCount.longValue() + "/" +
                                executeUDFCount.longValue() + ".\n");
+                       if(getTotalFedTransferCount() > 0)
+                               sb.append("Fed Put Count (Sc/Li/Ma/Fr/MC):\t" +
+                                       transferredScalarCount.longValue() + 
"/" +
+                                       transferredListCount.longValue() + "/" +
+                                       transferredMatrixCount.longValue() + 
"/" +
+                                       transferredFrameCount.longValue() + "/" 
+
+                                       transferredMatCharCount.longValue() + 
".\n");
+                       if(transferredMatrixBytes.longValue() > 0 || 
transferredFrameBytes.longValue() > 0)
+                               sb.append("Fed Put Bytes (Mat/Frame):\t" +
+                                       transferredMatrixBytes.longValue() + 
"/" +
+                                       transferredFrameBytes.longValue() + " 
Bytes.\n");
                        sb.append("Federated prefetch count:\t" +
                                asyncPrefetchCount.longValue() + ".\n");
                        return sb.toString();

Reply via email to