This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new d4839b4  [SYSTEMDS-2784] Calculate checksums for Fed PUT requests
d4839b4 is described below

commit d4839b42c5c071df34ba25201d0708595494af06
Author: arnabp <[email protected]>
AuthorDate: Fri Jan 8 16:07:46 2021 +0100

    [SYSTEMDS-2784] Calculate checksums for Fed PUT requests
    
    For lineage-based reuse, it is necessary to uniquely
    identify each data item sent via PUT requests. Inspired
    by Spark, this patch introduces Adler32 checksum calculation
    for every CacheBlock, and materialize it in Federated request.
---
 .../controlprogram/federated/FederatedRequest.java | 67 ++++++++++++++++++----
 .../federated/FederatedWorkerHandler.java          |  2 +-
 .../fed/AggregateBinaryFEDInstruction.java         |  6 --
 3 files changed, 57 insertions(+), 18 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index 33dad44..05d7e57 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -19,14 +19,21 @@
 
 package org.apache.sysds.runtime.controlprogram.federated;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
 
+import org.apache.sysds.api.DMLException;
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.runtime.lineage.LineageItem;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheDataOutput;
+import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.utils.Statistics;
 
 public class FederatedRequest implements Serializable {
@@ -47,7 +54,7 @@ public class FederatedRequest implements Serializable {
        private long _tid;
        private List<Object> _data;
        private boolean _checkPrivacy;
-       private List<Integer> _lineageHash;
+       private List<Long> _checksums;
        
        
        public FederatedRequest(RequestType method) {
@@ -68,6 +75,8 @@ public class FederatedRequest implements Serializable {
                _id = id;
                _data = data;
                setCheckPrivacy();
+               if (DMLScript.LINEAGE)
+                       setChecksum();
        }
        
        public RequestType getType() {
@@ -120,14 +129,50 @@ public class FederatedRequest implements Serializable {
                return _checkPrivacy;
        }
        
-       public void setLineageHash(LineageItem[] liItems) {
-               // copy the hash of the corresponding lineage DAG
-               // TODO: copy both Adler32 checksum (on data) and hash (on 
lineage DAG)
-               _lineageHash = Arrays.stream(liItems).map(li -> 
li.hashCode()).collect(Collectors.toList());
-       }
-       
-       public int getLineageHash(int i) {
-               return _lineageHash.get(i);
+       public void setChecksum() {
+               // Calculate Adler32 checksum. This is used as a leaf node of 
Lineage DAGs
+               // in the workers, and helps to uniquely identify a node 
(tracing PUT)
+               // TODO: append lineageitem hash if checksum is not enough
+               _checksums = new ArrayList<>();
+               try {
+                       calcChecksum();
+               }
+               catch (IOException e) {
+                       throw new DMLException(e);
+               }
+       }
+       
+       public long getChecksum(int i) {
+               return _checksums.get(i);
+       }
+       
+       private void calcChecksum() throws IOException {
+               for (Object ob : _data) {
+                       if (!(ob instanceof CacheBlock) && !(ob instanceof 
ScalarObject))
+                               continue;
+                       
+                       Checksum checksum = new Adler32();
+                       if (ob instanceof ScalarObject) {
+                               byte bytes[] = 
((ScalarObject)ob).getStringValue().getBytes();
+                               checksum.update(bytes, 0, bytes.length);
+                               _checksums.add(checksum.getValue());
+                       }
+                       
+                       if (ob instanceof CacheBlock) {
+                               try {
+                                       CacheBlock cb = (CacheBlock)ob;
+                                       long cbsize = 
LazyWriteBuffer.getCacheBlockSize((CacheBlock)ob);
+                                       DataOutput dout = new 
CacheDataOutput(new byte[(int)cbsize]);
+                                       cb.write(dout);
+                                       byte bytes[] = ((CacheDataOutput) 
dout).getBytes();
+                                       checksum.update(bytes, 0, bytes.length);
+                                       _checksums.add(checksum.getValue());
+                               }
+                               catch(Exception ex) {
+                                       throw new IOException("Failed to 
serialize cache block.", ex);
+                               }
+                       }
+               }
        }
        
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 5c0a0bc..4a69a10 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -272,7 +272,7 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                ec.setVariable(varname, data);
                if (DMLScript.LINEAGE)
                        // TODO: Identify MO uniquely. Use Adler32 checksum.
-                       ec.getLineage().set(varname, new 
LineageItem(String.valueOf(request.getLineageHash(0))));
+                       ec.getLineage().set(varname, new 
LineageItem(String.valueOf(request.getChecksum(0))));
 
                return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
index 4a8194b..6ed642e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
@@ -21,7 +21,6 @@ package org.apache.sysds.runtime.instructions.fed;
 
 import java.util.concurrent.Future;
 
-import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -32,7 +31,6 @@ import 
org.apache.sysds.runtime.controlprogram.federated.FederationMap.FType;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 
@@ -80,10 +78,6 @@ public class AggregateBinaryFEDInstruction extends 
BinaryFEDInstruction {
                else if(mo1.isFederated(FType.ROW)) { // MV + MM
                        //construct commands: broadcast rhs, fed mv, retrieve 
results
                        FederatedRequest fr1 = 
mo1.getFedMapping().broadcast(mo2);
-                       if (DMLScript.LINEAGE)
-                               //also copy the hash of the lineage DAG
-                               
fr1.setLineageHash(LineageItemUtils.getLineage(ec, input1));
-                               //TODO: calculate Adler32 checksum on data, and 
move this code inside FederationMap.
                        FederatedRequest fr2 = 
FederationUtils.callInstruction(instString, output,
                                new CPOperand[]{input1, input2}, new 
long[]{mo1.getFedMapping().getID(), fr1.getID()});
                        if( mo2.getNumColumns() == 1 ) { //MV

Reply via email to