This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new d4839b4 [SYSTEMDS-2784] Calculate checksums for Fed PUT requests
d4839b4 is described below
commit d4839b42c5c071df34ba25201d0708595494af06
Author: arnabp <[email protected]>
AuthorDate: Fri Jan 8 16:07:46 2021 +0100
[SYSTEMDS-2784] Calculate checksums for Fed PUT requests
For lineage-based reuse, it is necessary to uniquely
identify each data item sent via PUT requests. Inspired
by Spark, this patch introduces Adler32 checksum calculation
for every CacheBlock, and materialize it in Federated request.
---
.../controlprogram/federated/FederatedRequest.java | 67 ++++++++++++++++++----
.../federated/FederatedWorkerHandler.java | 2 +-
.../fed/AggregateBinaryFEDInstruction.java | 6 --
3 files changed, 57 insertions(+), 18 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index 33dad44..05d7e57 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -19,14 +19,21 @@
package org.apache.sysds.runtime.controlprogram.federated;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.stream.Collectors;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+import org.apache.sysds.api.DMLException;
import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.runtime.lineage.LineageItem;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheDataOutput;
+import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.utils.Statistics;
public class FederatedRequest implements Serializable {
@@ -47,7 +54,7 @@ public class FederatedRequest implements Serializable {
private long _tid;
private List<Object> _data;
private boolean _checkPrivacy;
- private List<Integer> _lineageHash;
+ private List<Long> _checksums;
public FederatedRequest(RequestType method) {
@@ -68,6 +75,8 @@ public class FederatedRequest implements Serializable {
_id = id;
_data = data;
setCheckPrivacy();
+ if (DMLScript.LINEAGE)
+ setChecksum();
}
public RequestType getType() {
@@ -120,14 +129,50 @@ public class FederatedRequest implements Serializable {
return _checkPrivacy;
}
- public void setLineageHash(LineageItem[] liItems) {
- // copy the hash of the corresponding lineage DAG
- // TODO: copy both Adler32 checksum (on data) and hash (on
lineage DAG)
- _lineageHash = Arrays.stream(liItems).map(li ->
li.hashCode()).collect(Collectors.toList());
- }
-
- public int getLineageHash(int i) {
- return _lineageHash.get(i);
+ public void setChecksum() {
+ // Calculate Adler32 checksum. This is used as a leaf node of
Lineage DAGs
+ // in the workers, and helps to uniquely identify a node
(tracing PUT)
+ // TODO: append lineageitem hash if checksum is not enough
+ _checksums = new ArrayList<>();
+ try {
+ calcChecksum();
+ }
+ catch (IOException e) {
+ throw new DMLException(e);
+ }
+ }
+
+ public long getChecksum(int i) {
+ return _checksums.get(i);
+ }
+
+ private void calcChecksum() throws IOException {
+ for (Object ob : _data) {
+ if (!(ob instanceof CacheBlock) && !(ob instanceof
ScalarObject))
+ continue;
+
+ Checksum checksum = new Adler32();
+ if (ob instanceof ScalarObject) {
+ byte bytes[] =
((ScalarObject)ob).getStringValue().getBytes();
+ checksum.update(bytes, 0, bytes.length);
+ _checksums.add(checksum.getValue());
+ }
+
+ if (ob instanceof CacheBlock) {
+ try {
+ CacheBlock cb = (CacheBlock)ob;
+ long cbsize =
LazyWriteBuffer.getCacheBlockSize((CacheBlock)ob);
+ DataOutput dout = new
CacheDataOutput(new byte[(int)cbsize]);
+ cb.write(dout);
+ byte bytes[] = ((CacheDataOutput)
dout).getBytes();
+ checksum.update(bytes, 0, bytes.length);
+ _checksums.add(checksum.getValue());
+ }
+ catch(Exception ex) {
+ throw new IOException("Failed to
serialize cache block.", ex);
+ }
+ }
+ }
}
@Override
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 5c0a0bc..4a69a10 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -272,7 +272,7 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
ec.setVariable(varname, data);
if (DMLScript.LINEAGE)
// TODO: Identify MO uniquely. Use Adler32 checksum.
- ec.getLineage().set(varname, new
LineageItem(String.valueOf(request.getLineageHash(0))));
+ ec.getLineage().set(varname, new
LineageItem(String.valueOf(request.getChecksum(0))));
return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
index 4a8194b..6ed642e 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
@@ -21,7 +21,6 @@ package org.apache.sysds.runtime.instructions.fed;
import java.util.concurrent.Future;
-import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -32,7 +31,6 @@ import
org.apache.sysds.runtime.controlprogram.federated.FederationMap.FType;
import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.Operator;
@@ -80,10 +78,6 @@ public class AggregateBinaryFEDInstruction extends
BinaryFEDInstruction {
else if(mo1.isFederated(FType.ROW)) { // MV + MM
//construct commands: broadcast rhs, fed mv, retrieve
results
FederatedRequest fr1 =
mo1.getFedMapping().broadcast(mo2);
- if (DMLScript.LINEAGE)
- //also copy the hash of the lineage DAG
-
fr1.setLineageHash(LineageItemUtils.getLineage(ec, input1));
- //TODO: calculate Adler32 checksum on data, and
move this code inside FederationMap.
FederatedRequest fr2 =
FederationUtils.callInstruction(instString, output,
new CPOperand[]{input1, input2}, new
long[]{mo1.getFedMapping().getID(), fr1.getID()});
if( mo2.getNumColumns() == 1 ) { //MV