This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e341dd  [SYSTEMDS-2799] Add synchronization to reuse UDFs in Fed 
workers
2e341dd is described below

commit 2e341dd346aa4efd9b8e4fbf44cfa12f63719815
Author: arnabp <[email protected]>
AuthorDate: Wed Feb 10 15:27:32 2021 +0100

    [SYSTEMDS-2799] Add synchronization to reuse UDFs in Fed workers
    
    This patch adds synchronization to facilitate sharing of the
    lineage cache among multiple worker threads/tenants. Thanks
    to Matthias for helping in reproducing the problem. Due to
    the sporadic nature of the issue, regorous tests (w/ many
    threads, memory constraints) will be needed to confirm robustness.
---
 .../apache/sysds/runtime/lineage/LineageCache.java | 66 +++++++++++-----------
 .../TransformFederatedEncodeApplyTest.java         |  5 +-
 2 files changed, 37 insertions(+), 34 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 0b37a85..d36a1a2 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -466,42 +466,44 @@ public class LineageCache
                if (udf.getLineageItem(ec) == null)
                        //TODO: trace all UDFs
                        return;
-               LineageItem item = udf.getLineageItem(ec).getValue();
-               LineageCacheEntry entry = _cache.get(item);
-               Data data = ec.getVariable(String.valueOf(outIds.get(0)));
-               if (!(data instanceof MatrixObject) && !(data instanceof 
ScalarObject)) {
-                       // Don't cache if the udf outputs frames
-                       _cache.remove(item);
-                       return;
-               }
-               
-               MatrixBlock mb = (data instanceof MatrixObject) ? 
-                               ((MatrixObject)data).acquireReadAndRelease() : 
null;
-               long size = mb != null ? mb.getInMemorySize() : 
((ScalarObject)data).getSize();
-
-               //remove the placeholder if the entry is bigger than the cache.
-               //FIXME: the resumed threads will enter into infinite wait as 
the entry
-               //is removed. Need to add support for graceful remove 
(placeholder) and resume.
-               if (size > LineageCacheEviction.getCacheLimit()) {
-                       _cache.remove(item);
-                       return;
-               }
+               synchronized (_cache) {
+                       LineageItem item = udf.getLineageItem(ec).getValue();
+                       LineageCacheEntry entry = _cache.get(item);
+                       Data data = 
ec.getVariable(String.valueOf(outIds.get(0)));
+                       if (!(data instanceof MatrixObject) && !(data 
instanceof ScalarObject)) {
+                               // Don't cache if the udf outputs frames
+                               _cache.remove(item);
+                               return;
+                       }
+                       
+                       MatrixBlock mb = (data instanceof MatrixObject) ? 
+                                       
((MatrixObject)data).acquireReadAndRelease() : null;
+                       long size = mb != null ? mb.getInMemorySize() : 
((ScalarObject)data).getSize();
+
+                       //remove the placeholder if the entry is bigger than 
the cache.
+                       //FIXME: the resumed threads will enter into infinite 
wait as the entry
+                       //is removed. Need to add support for graceful remove 
(placeholder) and resume.
+                       if (size > LineageCacheEviction.getCacheLimit()) {
+                               _cache.remove(item);
+                               return;
+                       }
 
-               //make space for the data
-               if (!LineageCacheEviction.isBelowThreshold(size))
-                       LineageCacheEviction.makeSpace(_cache, size);
-               LineageCacheEviction.updateSize(size, true);
+                       //make space for the data
+                       if (!LineageCacheEviction.isBelowThreshold(size))
+                               LineageCacheEviction.makeSpace(_cache, size);
+                       LineageCacheEviction.updateSize(size, true);
 
-               //place the data
-               if (data instanceof MatrixObject)
-                       entry.setValue(mb, computetime);
-               else if (data instanceof ScalarObject)
-                       entry.setValue((ScalarObject)data, computetime);
+                       //place the data
+                       if (data instanceof MatrixObject)
+                               entry.setValue(mb, computetime);
+                       else if (data instanceof ScalarObject)
+                               entry.setValue((ScalarObject)data, computetime);
 
-               //TODO: maintain statistics, lineage estimate
+                       //TODO: maintain statistics, lineage estimate
 
-               //maintain order for eviction
-               LineageCacheEviction.addEntry(entry);
+                       //maintain order for eviction
+                       LineageCacheEviction.addEntry(entry);
+               }
        }
        
        public static void resetCache() {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
index 73e0532..dba5e93 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
@@ -214,7 +214,6 @@ public class TransformFederatedEncodeApplyTest extends 
AutomatedTestBase {
                        int port3 = getRandomAvailablePort();
                        int port4 = getRandomAvailablePort();
                        String[] otherargs = lineage ? new String[] 
{"-lineage", "reuse_full"} : null;
-                       Lineage.resetInternalState();
                        t1 = startLocalFedWorkerThread(port1, otherargs, 
FED_WORKER_WAIT_S);
                        t2 = startLocalFedWorkerThread(port2, otherargs, 
FED_WORKER_WAIT_S);
                        t3 = startLocalFedWorkerThread(port3, otherargs, 
FED_WORKER_WAIT_S);
@@ -265,8 +264,10 @@ public class TransformFederatedEncodeApplyTest extends 
AutomatedTestBase {
                                "cols=" + dataset.getNumColumns(), "TFSPEC=" + 
HOME + "input/" + SPEC, "TFDATA1=" + output("tfout1"),
                                "TFDATA2=" + output("tfout2"), "OFMT=csv"};
                        
-                       if (lineage)
+                       if (lineage) {
+                               Lineage.resetInternalState();
                                programArgs = (String[]) 
ArrayUtils.addAll(lineageArgs, programArgs);
+                       }
 
                        runTest(true, false, null, -1);
 

Reply via email to