This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 34444e8  [SYSTEMDS-3185] Lineage-based reuse of federated reads
34444e8 is described below

commit 34444e88ac3163c1eb72a2012d1426378fd67817
Author: ywcb00 <[email protected]>
AuthorDate: Mon Feb 7 11:36:47 2022 +0100

    [SYSTEMDS-3185] Lineage-based reuse of federated reads
    
    This patch adds lineage-based reuse of federated reads on the workers.
    We fall back to the read cache if lineage-based reuse is globally disabled.
    
    Closes #1522
    Closes #1540
---
 .../federated/FederatedReadCache.java              |   4 +-
 .../controlprogram/federated/FederatedRequest.java |   3 +
 .../federated/FederatedStatistics.java             |  59 ++++----
 .../controlprogram/federated/FederatedWorker.java  |   6 +
 .../federated/FederatedWorkerHandler.java          | 148 +++++++++++++--------
 .../apache/sysds/runtime/lineage/LineageCache.java |  77 ++++++++++-
 .../java/org/apache/sysds/utils/Statistics.java    |   2 +-
 .../org/apache/sysds/test/AutomatedTestBase.java   |  13 +-
 .../multitenant/FederatedMultiTenantTest.java      |   1 -
 ...dCacheTest.java => FederatedReuseReadTest.java} |  47 +++++--
 .../federated/multitenant/MultiTenantTestBase.java |   8 +-
 ...eadCacheTest.dml => FederatedReuseReadTest.dml} |   0
 12 files changed, 255 insertions(+), 113 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedReadCache.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedReadCache.java
index a7180d7..b145b34 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedReadCache.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedReadCache.java
@@ -98,8 +98,8 @@ public class FederatedReadCache {
                        }
 
                        if(DMLScript.STATISTICS) {
-                               FederatedStatistics.incFedReadCacheHitCount();
-                               
FederatedStatistics.incFedReadCacheBytesCount(_data);
+                               FederatedStatistics.incFedReuseReadHitCount();
+                               
FederatedStatistics.incFedReuseReadBytesCount(_data);
                        }
 
                        //comes here if data is placed or the entry is removed 
by the running thread
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index 9e38527..6e9b388 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -150,6 +150,9 @@ public class FederatedRequest implements Serializable {
        }
 
        public long getChecksum(int i) {
+               if(_checksums == null)
+                       setChecksum();
+
                return _checksums.get(i);
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
index 9ef0518..5597d18 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
@@ -37,8 +37,9 @@ import javax.net.ssl.SSLException;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import 
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import 
org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics.FedStatsCollection.CacheStatsCollection;
@@ -74,8 +75,8 @@ public class FederatedStatistics {
        private static final LongAdder fedLookupTableGetCount = new LongAdder();
        private static final LongAdder fedLookupTableGetTime = new LongAdder(); 
// in milli sec
        private static final LongAdder fedLookupTableEntryCount = new 
LongAdder();
-       private static final LongAdder fedReadCacheHitCount = new LongAdder();
-       private static final LongAdder fedReadCacheBytesCount = new LongAdder();
+       private static final LongAdder fedReuseReadHitCount = new LongAdder();
+       private static final LongAdder fedReuseReadBytesCount = new LongAdder();
 
        public static synchronized void incFederated(RequestType rqt, 
List<Object> data){
                switch (rqt) {
@@ -138,8 +139,8 @@ public class FederatedStatistics {
                fedLookupTableGetCount.reset();
                fedLookupTableGetTime.reset();
                fedLookupTableEntryCount.reset();
-               fedReadCacheHitCount.reset();
-               fedReadCacheBytesCount.reset();
+               fedReuseReadHitCount.reset();
+               fedReuseReadBytesCount.reset();
        }
 
        public static String displayFedIOExecStatistics() {
@@ -218,7 +219,7 @@ public class FederatedStatistics {
        private static String 
displayMultiTenantStats(MultiTenantStatsCollection mtsc) {
                StringBuilder sb = new StringBuilder();
                sb.append(displayFedLookupTableStats(mtsc.fLTGetCount, 
mtsc.fLTEntryCount, mtsc.fLTGetTime));
-               sb.append(displayFedReadCacheStats(mtsc.readCacheHits, 
mtsc.readCacheBytes));
+               sb.append(displayFedReuseReadStats(mtsc.reuseReadHits, 
mtsc.reuseReadBytes));
                return sb.toString();
        }
 
@@ -340,12 +341,12 @@ public class FederatedStatistics {
                return fedLookupTableEntryCount.longValue();
        }
 
-       public static long getFedReadCacheHitCount() {
-               return fedReadCacheHitCount.longValue();
+       public static long getFedReuseReadHitCount() {
+               return fedReuseReadHitCount.longValue();
        }
 
-       public static long getFedReadCacheBytesCount() {
-               return fedReadCacheBytesCount.longValue();
+       public static long getFedReuseReadBytesCount() {
+               return fedReuseReadBytesCount.longValue();
        }
 
        public static void incFedLookupTableGetCount() {
@@ -360,12 +361,16 @@ public class FederatedStatistics {
                fedLookupTableEntryCount.increment();
        }
 
-       public static void incFedReadCacheHitCount() {
-               fedReadCacheHitCount.increment();
+       public static void incFedReuseReadHitCount() {
+               fedReuseReadHitCount.increment();
+       }
+
+       public static void incFedReuseReadBytesCount(CacheableData<?> data) {
+               fedReuseReadBytesCount.add(data.getDataSize());
        }
 
-       public static void incFedReadCacheBytesCount(CacheableData<?> data) {
-               fedReadCacheBytesCount.add(data.getDataSize());
+       public static void incFedReuseReadBytesCount(CacheBlock cb) {
+               fedReuseReadBytesCount.add(cb.getInMemorySize());
        }
 
        public static String displayFedLookupTableStats() {
@@ -383,16 +388,16 @@ public class FederatedStatistics {
                return "";
        }
 
-       public static String displayFedReadCacheStats() {
-               return 
displayFedReadCacheStats(fedReadCacheHitCount.longValue(),
-                       fedReadCacheBytesCount.longValue());
+       public static String displayFedReuseReadStats() {
+               return 
displayFedReuseReadStats(fedReuseReadHitCount.longValue(),
+                       fedReuseReadBytesCount.longValue());
        }
 
-       public static String displayFedReadCacheStats(long rcHits, long 
rcBytes) {
-               if(rcHits > 0) {
+       public static String displayFedReuseReadStats(long rrHits, long 
rrBytes) {
+               if(rrHits > 0) {
                        StringBuilder sb = new StringBuilder();
-                       sb.append("Fed ReadCache (Hits, Bytes):\t" +
-                               rcHits + "/" + rcBytes + ".\n");
+                       sb.append("Fed ReuseRead (Hits, Bytes):\t" +
+                               rrHits + "/" + rrBytes + ".\n");
                        return sb.toString();
                }
                return "";
@@ -515,23 +520,23 @@ public class FederatedStatistics {
                                fLTGetCount = getFedLookupTableGetCount();
                                fLTGetTime = 
((double)getFedLookupTableGetTime()) / 1000000000; // in sec
                                fLTEntryCount = getFedLookupTableEntryCount();
-                               readCacheHits = getFedReadCacheHitCount();
-                               readCacheBytes = getFedReadCacheBytesCount();
+                               reuseReadHits = getFedReuseReadHitCount();
+                               reuseReadBytes = getFedReuseReadBytesCount();
                        }
 
                        private void aggregate(MultiTenantStatsCollection that) 
{
                                fLTGetCount += that.fLTGetCount;
                                fLTGetTime += that.fLTGetTime;
                                fLTEntryCount += that.fLTEntryCount;
-                               readCacheHits += that.readCacheHits;
-                               readCacheBytes += that.readCacheBytes;
+                               reuseReadHits += that.reuseReadHits;
+                               reuseReadBytes += that.reuseReadBytes;
                        }
 
                        private long fLTGetCount = 0;
                        private double fLTGetTime = 0;
                        private long fLTEntryCount = 0;
-                       private long readCacheHits = 0;
-                       private long readCacheBytes = 0;
+                       private long reuseReadHits = 0;
+                       private long reuseReadBytes = 0;
                }
 
                private CacheStatsCollection cacheStats = new 
CacheStatsCollection();
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index ef94dfe..bef4032 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -40,9 +40,11 @@ import io.netty.handler.codec.serialization.ObjectEncoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.SelfSignedCertificate;
+import org.apache.sysds.api.DMLScript;
 import org.apache.log4j.Logger;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig;
 
 public class FederatedWorker {
        protected static Logger log = Logger.getLogger(FederatedWorker.class);
@@ -57,6 +59,10 @@ public class FederatedWorker {
                _frc = new FederatedReadCache();
                _port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port;
                _debug = debug;
+
+               LineageCacheConfig.setConfig(DMLScript.LINEAGE_REUSE);
+               LineageCacheConfig.setCachePolicy(DMLScript.LINEAGE_POLICY);
+               LineageCacheConfig.setEstimator(DMLScript.LINEAGE_ESTIMATE);
        }
 
        public void run() throws CertificateException, SSLException {
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index ca2b055..d798c2d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -246,74 +246,46 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                        // early throwing of exception to avoid infinitely 
waiting threads for data
                        throw new FederatedWorkerHandlerException("Could not 
recognize datatype");
 
-               CacheableData<?> cd = _frc.get(filename);
-               if(cd == null) {
+               ExecutionContext ec = ecm.get(tid);
+               LineageItem linItem = new LineageItem(filename);
+               CacheableData<?> cd = null;
+
+               if(!LineageCache.reuseFedRead(Long.toString(id), dataType, 
linItem, ec)) {
+                       // Lookup read cache if reuse is disabled and we 
skipped storing in the
+                       // lineage cache due to other constraints
+                       // FIXME: It is possible that lineage reuse is enabled 
later. In that case
+                       //  read cache may not be empty. Hence, it may be 
necessary to lookup both
+                       //  the caches.
+                       if (ReuseCacheType.isNone() || dataType != 
DataType.MATRIX)
+                               cd = _frc.get(filename);
                        try {
-                               switch(dataType) {
-                                       case MATRIX:
-                                               cd = new 
MatrixObject(Types.ValueType.FP64, filename);
-                                               break;
-                                       case FRAME:
-                                               cd = new FrameObject(filename);
-                                               break;
-                                       default:
-                                               throw new 
FederatedWorkerHandlerException("Could not recognize datatype");
+                               if(cd == null) { // data is neither in lineage 
cache nor in read cache
+                                       long t0 = !ReuseCacheType.isNone() ? 
System.nanoTime() : 0;
+                                       cd = readDataNoReuse(filename, 
dataType, mc); // actual read of the data
+                                       long t1 = !ReuseCacheType.isNone() ? 
System.nanoTime() : 0;
+                                       if(!ReuseCacheType.isNone() && dataType 
== DataType.MATRIX)
+                                               // put the object into the 
lineage cache
+                                               // FIXME: As we lazily read the 
actual data, this computetime
+                                               //  only records the metadata 
read. A small computetime wrongly
+                                               //  dictates the cache eviction 
logic to remove this entry early.
+                                               
LineageCache.putFedReadObject(cd, linItem, ec, t1 - t0);
+                                       else
+                                               _frc.setData(filename, cd); // 
set the data into the read cache entry
                                }
+                               ec.setVariable(String.valueOf(id), cd);
 
-                               FileFormat fmt = null;
-                               boolean header = false;
-                               String delim = null;
-                               FileSystem fs = null;
-                               MetaDataAll mtd;
-
-                               try {
-                                       final String mtdName = 
DataExpression.getMTDFileName(filename);
-                                       Path path = new Path(mtdName);
-                                       fs = 
IOUtilFunctions.getFileSystem(mtdName);
-                                       try(BufferedReader br = new 
BufferedReader(new InputStreamReader(fs.open(path)))) {
-                                               mtd = new MetaDataAll(br);
-                                               if(!mtd.mtdExists())
-                                                       throw new 
FederatedWorkerHandlerException("Could not parse metadata file");
-                                               mc.setRows(mtd.getDim1());
-                                               mc.setCols(mtd.getDim2());
-                                               mc.setNonZeros(mtd.getNnz());
-                                               header = mtd.getHasHeader();
-                                               cd = 
mtd.parseAndSetPrivacyConstraint(cd);
-                                               fmt = mtd.getFileFormat();
-                                               delim = mtd.getDelim();
-                                       }
-                               }
-                               catch(DMLPrivacyException | 
FederatedWorkerHandlerException ex) {
-                                       throw ex;
-                               }
-                               catch(Exception ex) {
-                                       String msg = "Exception of type " + 
ex.getClass() + " thrown when processing READ request";
-                                       LOG.error(msg, ex);
-                                       throw new DMLRuntimeException(msg);
-                               }
-                               finally {
-                                       IOUtilFunctions.closeSilently(fs);
-                               }
-
-                               // put meta data object in symbol table, read 
on first operation
-                               cd.setMetaData(new MetaDataFormat(mc, fmt));
-                               if(fmt == FileFormat.CSV)
-                                       cd.setFileFormatProperties(new 
FileFormatPropertiesCSV(header, delim,
-                                               
DataExpression.DEFAULT_DELIM_SPARSE));
-                               cd.enableCleanup(false); // guard against 
deletion
-
-                               _frc.setData(filename, cd);
                        } catch(Exception ex) {
-                               _frc.setInvalid(filename);
+                               if(!ReuseCacheType.isNone() && dataType == 
DataType.MATRIX)
+                                       LineageCache.putFedReadObject(null, 
linItem, ec, 0); // removing the placeholder
+                               else
+                                       _frc.setInvalid(filename);
                                throw ex;
                        }
                }
 
-               ecm.get(tid).setVariable(String.valueOf(id), cd);
-
                if(DMLScript.LINEAGE)
                        // create a literal type lineage item with the file name
-                       ecm.get(tid).getLineage().set(String.valueOf(id), new 
LineageItem(filename));
+                       ec.getLineage().set(String.valueOf(id), linItem);
 
                if(dataType == Types.DataType.FRAME) {
                        FrameObject frameObject = (FrameObject) cd;
@@ -325,6 +297,66 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                return new FederatedResponse(ResponseType.SUCCESS, new Object[] 
{id, mc});
        }
 
+       private CacheableData<?> readDataNoReuse(String filename, DataType 
dataType,
+               MatrixCharacteristics mc) {
+               CacheableData<?> cd = null;
+
+               switch(dataType) {
+                       case MATRIX:
+                               cd = new MatrixObject(Types.ValueType.FP64, 
filename);
+                               break;
+                       case FRAME:
+                               cd = new FrameObject(filename);
+                               break;
+                       default:
+                               throw new 
FederatedWorkerHandlerException("Could not recognize datatype");
+               }
+
+               FileFormat fmt = null;
+               boolean header = false;
+               String delim = null;
+               FileSystem fs = null;
+               MetaDataAll mtd;
+
+               try {
+                       final String mtdName = 
DataExpression.getMTDFileName(filename);
+                       Path path = new Path(mtdName);
+                       fs = IOUtilFunctions.getFileSystem(mtdName);
+                       try(BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)))) {
+                               mtd = new MetaDataAll(br);
+                               if(!mtd.mtdExists())
+                                       throw new 
FederatedWorkerHandlerException("Could not parse metadata file");
+                               mc.setRows(mtd.getDim1());
+                               mc.setCols(mtd.getDim2());
+                               mc.setNonZeros(mtd.getNnz());
+                               header = mtd.getHasHeader();
+                               cd = mtd.parseAndSetPrivacyConstraint(cd);
+                               fmt = mtd.getFileFormat();
+                               delim = mtd.getDelim();
+                       }
+               }
+               catch(DMLPrivacyException | FederatedWorkerHandlerException ex) 
{
+                       throw ex;
+               }
+               catch(Exception ex) {
+                       String msg = "Exception of type " + ex.getClass() + " 
thrown when processing READ request";
+                       LOG.error(msg, ex);
+                       throw new DMLRuntimeException(msg);
+               }
+               finally {
+                       IOUtilFunctions.closeSilently(fs);
+               }
+
+               // put meta data object in symbol table, read on first operation
+               cd.setMetaData(new MetaDataFormat(mc, fmt));
+               if(fmt == FileFormat.CSV)
+                       cd.setFileFormatProperties(new 
FileFormatPropertiesCSV(header, delim,
+                               DataExpression.DEFAULT_DELIM_SPARSE));
+               cd.enableCleanup(false); // guard against deletion
+
+               return cd;
+       }
+
        private FederatedResponse putVariable(FederatedRequest request, 
ExecutionContextMap ecm) {
                checkNumParams(request.getNumParams(), 1, 2);
                final String varName = String.valueOf(request.getID());
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 8450fb5..472c6ff 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -33,6 +33,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
 import org.apache.sysds.runtime.instructions.CPInstructionParser;
 import org.apache.sysds.runtime.instructions.Instruction;
@@ -162,14 +163,14 @@ public class LineageCache
                                                if (mb == null && 
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
                                                        return false;  //the 
executing thread removed this entry from cache
                                                else
-                                                       
ec.setMatrixOutput(outName, e.getMBValue());
+                                                       
ec.setMatrixOutput(outName, mb);
                                        }
                                        else if (e.isScalarValue()) {
                                                ScalarObject so = 
e.getSOValue(); //wait if another thread is executing the same inst.
                                                if (so == null && 
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
                                                        return false;  //the 
executing thread removed this entry from cache
                                                else
-                                                       
ec.setScalarOutput(outName, e.getSOValue());
+                                                       
ec.setScalarOutput(outName, so);
                                        }
                                        else { //TODO handle locks on gpu 
objects
                                                //shallow copy the cached 
GPUObj to the output MatrixObject
@@ -364,7 +365,39 @@ public class LineageCache
                }
                return new 
FederatedResponse(FederatedResponse.ResponseType.ERROR);
        }
-       
+
+       public static boolean reuseFedRead(String outName, DataType dataType, 
LineageItem li, ExecutionContext ec) {
+               if (ReuseCacheType.isNone() || dataType != DataType.MATRIX)
+                       return false;
+
+               LineageCacheEntry e = null;
+               synchronized(_cache) {
+                       if(LineageCache.probe(li)) {
+                               e = LineageCache.getIntern(li);
+                       }
+                       else {
+                               putIntern(li, dataType, null, null, 0);
+                               return false; // direct return after placing 
the placeholder
+                       }
+               }
+
+               if(e != null && e.isMatrixValue()) {
+                       MatrixBlock mb = e.getMBValue(); // waiting if the 
value is not set yet
+                       if (mb == null || e.getCacheStatus() == 
LineageCacheStatus.NOTCACHED)
+                               return false;  // the executing thread removed 
this entry from cache
+                       ec.setMatrixOutput(outName, e.getMBValue());
+
+                       if (DMLScript.STATISTICS) { //increment saved time
+                               FederatedStatistics.incFedReuseReadHitCount();
+                               
FederatedStatistics.incFedReuseReadBytesCount(mb);
+                               
LineageCacheStatistics.incrementSavedComputeTime(e._computeTime);
+                       }
+
+                       return true;
+               }
+               return false;
+       }
+
        public static boolean probe(LineageItem key) {
                //TODO problematic as after probe the matrix might be kicked 
out of cache
                boolean p = _cache.containsKey(key);  // in cache or in disk
@@ -542,7 +575,7 @@ public class LineageCache
                        LineageGPUCacheEviction.addEntry(centry);
                }
        }
-       
+
        public static void putValue(List<DataIdentifier> outputs,
                LineageItem[] liInputs, String name, ExecutionContext ec, long 
computetime)
        {
@@ -629,7 +662,37 @@ public class LineageCache
                        LineageCacheEviction.addEntry(entry);
                }
        }
-       
+
+       public static void putFedReadObject(Data data, LineageItem li, 
ExecutionContext ec, long computetime) {
+               if(ReuseCacheType.isNone())
+                       return;
+
+               LineageCacheEntry entry = _cache.get(li);
+               if(entry != null && data instanceof MatrixObject) {
+                       MatrixBlock mb = ((MatrixObject)data).acquireRead();
+                       synchronized(_cache) {
+                               long size = mb != null ? mb.getInMemorySize() : 
0;
+
+                               //remove the placeholder if the entry is bigger 
than the cache.
+                               if (size > 
LineageCacheEviction.getCacheLimit()) {
+                                       removePlaceholder(li);
+                               }
+
+                               //make space for the data
+                               if 
(!LineageCacheEviction.isBelowThreshold(size))
+                                       LineageCacheEviction.makeSpace(_cache, 
size);
+                               LineageCacheEviction.updateSize(size, true);
+
+                               entry.setValue(mb, computetime);
+                       }
+               }
+               else {
+                       synchronized(_cache) {
+                               removePlaceholder(li);
+                       }
+               }
+       }
+
        public static void resetCache() {
                synchronized (_cache) {
                        _cache.clear();
@@ -658,7 +721,7 @@ public class LineageCache
                        long size = newItem.getSize();
                        if( size > LineageCacheEviction.getCacheLimit())
                                return; //not applicable
-                       if( !LineageCacheEviction.isBelowThreshold(size) ) 
+                       if( !LineageCacheEviction.isBelowThreshold(size) )
                                LineageCacheEviction.makeSpace(_cache, size);
                        LineageCacheEviction.updateSize(size, true);
                }
@@ -697,7 +760,7 @@ public class LineageCache
                        LineageCacheEntry e = _cache.get(item);
                        boolean exists = !e.isNullVal();
                        if (oe.isMatrixValue())
-                               e.setValue(oe.getMBValue(), computetime); 
+                               e.setValue(oe.getMBValue(), computetime);
                        else
                                e.setValue(oe.getSOValue(), computetime);
                        e._origItem = probeItem; 
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java 
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 4382138..b105ffa 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -655,7 +655,7 @@ public class Statistics
 
                        
sb.append(FederatedStatistics.displayFedIOExecStatistics());
                        
sb.append(FederatedStatistics.displayFedLookupTableStats());
-                       
sb.append(FederatedStatistics.displayFedReadCacheStats());
+                       
sb.append(FederatedStatistics.displayFedReuseReadStats());
 
                        sb.append(TransformStatistics.displayStatistics());
 
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java 
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index af00ec3..5944947 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -41,6 +41,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -1540,6 +1541,11 @@ public abstract class AutomatedTestBase {
                }
        }
 
+       @Deprecated
+       protected Process startLocalFedWorker(int port) {
+               return startLocalFedWorker(port, null);
+       }
+
        /**
         * Start new JVM for a federated worker at the port.
         * 
@@ -1548,13 +1554,14 @@ public abstract class AutomatedTestBase {
         * @return the process associated with the worker.
         */
        @Deprecated
-       protected Process startLocalFedWorker(int port) {
+       protected Process startLocalFedWorker(int port, String[] addArgs) {
                Process process = null;
                String separator = System.getProperty("file.separator");
                String classpath = System.getProperty("java.class.path");
                String path = System.getProperty("java.home") + separator + 
"bin" + separator + "java";
-               ProcessBuilder processBuilder = new ProcessBuilder(path, "-cp", 
classpath, DMLScript.class.getName(), "-w",
-                       Integer.toString(port), "-stats");
+               String[] args = ArrayUtils.addAll(new String[]{path, "-cp", 
classpath, DMLScript.class.getName(),
+                       "-w", Integer.toString(port), "-stats"}, addArgs);
+               ProcessBuilder processBuilder = new ProcessBuilder(args);
 
                try {
                        process = processBuilder.start();
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
index ca0d1ce..d03b9fd 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
@@ -282,7 +282,6 @@ public class FederatedMultiTenantTest extends 
MultiTenantTestBase {
 
                // wait for the coordinator processes to end and verify the 
results
                String coordinatorOutput = waitForCoordinators();
-               System.out.println(coordinatorOutput);
                verifyResults(opType, coordinatorOutput, execMode);
 
                // check that federated input files are still existing
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReadCacheTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReuseReadTest.java
similarity index 82%
rename from 
src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReadCacheTest.java
rename to 
src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReuseReadTest.java
index 67e4c1a..047a98f 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReadCacheTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedReuseReadTest.java
@@ -40,11 +40,11 @@ import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederatedReadCacheTest extends MultiTenantTestBase {
-       private final static String TEST_NAME = "FederatedReadCacheTest";
+public class FederatedReuseReadTest extends MultiTenantTestBase {
+       private final static String TEST_NAME = "FederatedReuseReadTest";
 
        private final static String TEST_DIR = 
"functions/federated/multitenant/";
-       private static final String TEST_CLASS_DIR = TEST_DIR + 
FederatedReadCacheTest.class.getSimpleName() + "/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FederatedReuseReadTest.class.getSimpleName() + "/";
 
        private final static double TOLERANCE = 0;
 
@@ -82,28 +82,51 @@ public class FederatedReadCacheTest extends 
MultiTenantTestBase {
 
        @Test
        public void testPlusScalarCP() {
-               runReadCacheTest(OpType.PLUS_SCALAR, 3, ExecMode.SINGLE_NODE);
+               runReuseReadTest(OpType.PLUS_SCALAR, 3, ExecMode.SINGLE_NODE, 
false);
        }
 
        @Test
        @Ignore
        public void testPlusScalarSP() {
-               runReadCacheTest(OpType.PLUS_SCALAR, 3, ExecMode.SPARK);
+               runReuseReadTest(OpType.PLUS_SCALAR, 3, ExecMode.SPARK, false);
+       }
+
+       @Test
+       @Ignore
+       public void testPlusScalarLineageCP() {
+               runReuseReadTest(OpType.PLUS_SCALAR, 3, ExecMode.SINGLE_NODE, 
true);
+       }
+
+       @Test
+       public void testPlusScalarLineageSP() {
+               runReuseReadTest(OpType.PLUS_SCALAR, 3, ExecMode.SPARK, true);
        }
 
        @Test
        public void testModifiedValCP() {
                //TODO with 4 runs sporadically into non-terminating state
-               runReadCacheTest(OpType.MODIFIED_VAL, 3, ExecMode.SINGLE_NODE);
+               runReuseReadTest(OpType.MODIFIED_VAL, 3, ExecMode.SINGLE_NODE, 
false);
        }
 
        @Test
        @Ignore
        public void testModifiedValSP() {
-               runReadCacheTest(OpType.MODIFIED_VAL, 4, ExecMode.SPARK);
+               runReuseReadTest(OpType.MODIFIED_VAL, 4, ExecMode.SPARK, false);
+       }
+
+       @Test
+       @Ignore
+       public void testModifiedValLineageCP() {
+               //TODO with 4 runs sporadically into non-terminating state
+               runReuseReadTest(OpType.MODIFIED_VAL, 3, ExecMode.SINGLE_NODE, 
true);
+       }
+
+       @Test
+       public void testModifiedValLineageSP() {
+               runReuseReadTest(OpType.MODIFIED_VAL, 4, ExecMode.SPARK, true);
        }
 
-       private void runReadCacheTest(OpType opType, int numCoordinators, 
ExecMode execMode) {
+       private void runReuseReadTest(OpType opType, int numCoordinators, 
ExecMode execMode, boolean lineage) {
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                ExecMode platformOld = rtplatform;
 
@@ -135,7 +158,7 @@ public class FederatedReadCacheTest extends 
MultiTenantTestBase {
                // empty script name because we don't execute any script, just 
start the worker
                fullDMLScriptName = "";
 
-               int[] workerPorts = startFedWorkers(4);
+               int[] workerPorts = startFedWorkers(4, lineage ? new 
String[]{"-lineage", "reuse"} : null);
 
                rtplatform = execMode;
                if(rtplatform == ExecMode.SPARK) {
@@ -146,7 +169,8 @@ public class FederatedReadCacheTest extends 
MultiTenantTestBase {
 
                // start the coordinator processes
                String scriptName = HOME + TEST_NAME + ".dml";
-               programArgs = new String[] {"-stats", "100", "-fedStats", 
"100", "-nvargs",
+               programArgs = new String[] {"-config", CONFIG_DIR + 
"SystemDS-MultiTenant-config.xml",
+                       "-stats", "100", "-fedStats", "100", "-nvargs",
                        "in_X1=" + TestUtils.federatedAddress(workerPorts[0], 
""),
                        "in_X2=" + TestUtils.federatedAddress(workerPorts[1], 
""),
                        "in_X3=" + TestUtils.federatedAddress(workerPorts[2], 
""),
@@ -160,7 +184,6 @@ public class FederatedReadCacheTest extends 
MultiTenantTestBase {
 
                // wait for the coordinator processes to end and verify the 
results
                String coordinatorOutput = waitForCoordinators();
-               System.out.println(coordinatorOutput);
                verifyResults(opType, coordinatorOutput, execMode);
 
                // check that federated input files are still existing
@@ -178,7 +201,7 @@ public class FederatedReadCacheTest extends 
MultiTenantTestBase {
        private void verifyResults(OpType opType, String outputLog, ExecMode 
execMode) {
                Assert.assertTrue(checkForHeavyHitter(opType, outputLog, 
execMode));
                // verify that the matrix object has been taken from cache
-               Assert.assertTrue(outputLog.contains("Fed ReadCache (Hits, 
Bytes):\t"
+               Assert.assertTrue(outputLog.contains("Fed ReuseRead (Hits, 
Bytes):\t"
                        + Integer.toString((coordinatorProcesses.size()-1) * 
workerProcesses.size()) + "/"));
 
                // compare the results via files
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/MultiTenantTestBase.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/MultiTenantTestBase.java
index 90f50d4..167e330 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/MultiTenantTestBase.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/MultiTenantTestBase.java
@@ -49,6 +49,10 @@ public abstract class MultiTenantTestBase extends 
AutomatedTestBase {
                        p.destroyForcibly();
        }
 
+       protected int[] startFedWorkers(int numFedWorkers) {
+               return startFedWorkers(numFedWorkers, null);
+       }
+
        /**
         * Start numFedWorkers federated worker processes on available ports 
and add
         * them to the workerProcesses
@@ -56,12 +60,12 @@ public abstract class MultiTenantTestBase extends 
AutomatedTestBase {
         * @param numFedWorkers the number of federated workers to start
         * @return int[] the ports of the created federated workers
         */
-       protected int[] startFedWorkers(int numFedWorkers) {
+       protected int[] startFedWorkers(int numFedWorkers, String[] addArgs) {
                int[] ports = new int[numFedWorkers];
                for(int counter = 0; counter < numFedWorkers; counter++) {
                        ports[counter] = getRandomAvailablePort();
                        @SuppressWarnings("deprecation")
-                       Process tmpProcess = 
startLocalFedWorker(ports[counter]);
+                       Process tmpProcess = 
startLocalFedWorker(ports[counter], addArgs);
                        workerProcesses.add(tmpProcess);
                }
                return ports;
diff --git 
a/src/test/scripts/functions/federated/multitenant/FederatedReadCacheTest.dml 
b/src/test/scripts/functions/federated/multitenant/FederatedReuseReadTest.dml
similarity index 100%
rename from 
src/test/scripts/functions/federated/multitenant/FederatedReadCacheTest.dml
rename to 
src/test/scripts/functions/federated/multitenant/FederatedReuseReadTest.dml

Reply via email to