This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new acd33e1  [SYSTEMDS-2630] Multi-threaded slicing in sliced federated 
broadcasts
acd33e1 is described below

commit acd33e166b756cdf5884c89b735089f17ccfb2c6
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Nov 14 22:26:08 2020 +0100

    [SYSTEMDS-2630] Multi-threaded slicing in sliced federated broadcasts
    
    For the case of broadcasting a large, potentially sparse, matrix in a
    sliced manner (where every federated partition only received the needed
    data) so far we sliced the blocks sequentially. With this patch, we
    simply do this independent block slicing in a multi-threaded manner in
    order to avoid unnecessary overhead in case of large broadcasts.
---
 .../controlprogram/federated/FederationMap.java    | 28 ++++++++++++++++++----
 .../instructions/fed/TsmmFEDInstruction.java       |  1 -
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index b647476..f670c17 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -104,20 +104,38 @@ public class FederationMap
                return new FederatedRequest(RequestType.PUT_VAR, id, scalar);
        }
        
+       /**
+        * Creates separate slices of an input data object according
+        * to the index ranges of federated data. Theses slices are then
+        * wrapped in separate federated requests for broadcasting.
+        * 
+        * @param data input data object (matrix, tensor, frame)
+        * @param transposed false: slice according to federated data,
+        *                   true: slice according to transposed federated data
+        * @return array of federated requests corresponding to federated data
+        */
        public FederatedRequest[] broadcastSliced(CacheableData<?> data, 
boolean transposed) {
-               //prepare separate requests for different slices
+               //prepare broadcast id and pin input
                long id = FederationUtils.getNextFedDataID();
                CacheBlock cb = data.acquireReadAndRelease();
-               List<FederatedRequest> ret = new ArrayList<>();
+               
+               //prepare indexing ranges
+               int[][] ix = new int[_fedMap.size()][];
+               int pos = 0;
                for(Entry<FederatedRange, FederatedData> e : 
_fedMap.entrySet()) {
                        int rl = transposed ? 0 : 
e.getKey().getBeginDimsInt()[0];
                        int ru = transposed ? cb.getNumRows()-1 : 
e.getKey().getEndDimsInt()[0]-1;
                        int cl = transposed ? e.getKey().getBeginDimsInt()[0] : 
0;
                        int cu = transposed ? e.getKey().getEndDimsInt()[0]-1 : 
cb.getNumColumns()-1;
-                       CacheBlock tmp = cb.slice(rl, ru, cl, cu, new 
MatrixBlock());
-                       ret.add(new FederatedRequest(RequestType.PUT_VAR, id, 
tmp));
+                       ix[pos++] = new int[] {rl, ru, cl, cu};
                }
-               return ret.toArray(new FederatedRequest[0]);
+               
+               //multi-threaded block slicing and federation request creation
+               FederatedRequest[] ret = new FederatedRequest[ix.length];
+               Arrays.parallelSetAll(ret, i ->
+                       new FederatedRequest(RequestType.PUT_VAR, id,
+                       cb.slice(ix[i][0], ix[i][1], ix[i][2], ix[i][3], new 
MatrixBlock())));
+               return ret;
        }
        
        public boolean isAligned(FederationMap that, boolean transposed) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
index ed9615f..62438c0 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/TsmmFEDInstruction.java
@@ -32,7 +32,6 @@ import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-import java.util.Arrays;
 import java.util.concurrent.Future;
 
 public class TsmmFEDInstruction extends BinaryFEDInstruction {

Reply via email to