This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 2e341dd [SYSTEMDS-2799] Add synchronization to reuse UDFs in Fed
workers
2e341dd is described below
commit 2e341dd346aa4efd9b8e4fbf44cfa12f63719815
Author: arnabp <[email protected]>
AuthorDate: Wed Feb 10 15:27:32 2021 +0100
[SYSTEMDS-2799] Add synchronization to reuse UDFs in Fed workers
This patch adds synchronization to facilitate sharing of the
lineage cache among multiple worker threads/tenants. Thanks
to Matthias for helping in reproducing the problem. Due to
the sporadic nature of the issue, regorous tests (w/ many
threads, memory constraints) will be needed to confirm robustness.
---
.../apache/sysds/runtime/lineage/LineageCache.java | 66 +++++++++++-----------
.../TransformFederatedEncodeApplyTest.java | 5 +-
2 files changed, 37 insertions(+), 34 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 0b37a85..d36a1a2 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -466,42 +466,44 @@ public class LineageCache
if (udf.getLineageItem(ec) == null)
//TODO: trace all UDFs
return;
- LineageItem item = udf.getLineageItem(ec).getValue();
- LineageCacheEntry entry = _cache.get(item);
- Data data = ec.getVariable(String.valueOf(outIds.get(0)));
- if (!(data instanceof MatrixObject) && !(data instanceof
ScalarObject)) {
- // Don't cache if the udf outputs frames
- _cache.remove(item);
- return;
- }
-
- MatrixBlock mb = (data instanceof MatrixObject) ?
- ((MatrixObject)data).acquireReadAndRelease() :
null;
- long size = mb != null ? mb.getInMemorySize() :
((ScalarObject)data).getSize();
-
- //remove the placeholder if the entry is bigger than the cache.
- //FIXME: the resumed threads will enter into infinite wait as
the entry
- //is removed. Need to add support for graceful remove
(placeholder) and resume.
- if (size > LineageCacheEviction.getCacheLimit()) {
- _cache.remove(item);
- return;
- }
+ synchronized (_cache) {
+ LineageItem item = udf.getLineageItem(ec).getValue();
+ LineageCacheEntry entry = _cache.get(item);
+ Data data =
ec.getVariable(String.valueOf(outIds.get(0)));
+ if (!(data instanceof MatrixObject) && !(data
instanceof ScalarObject)) {
+ // Don't cache if the udf outputs frames
+ _cache.remove(item);
+ return;
+ }
+
+ MatrixBlock mb = (data instanceof MatrixObject) ?
+
((MatrixObject)data).acquireReadAndRelease() : null;
+ long size = mb != null ? mb.getInMemorySize() :
((ScalarObject)data).getSize();
+
+ //remove the placeholder if the entry is bigger than
the cache.
+ //FIXME: the resumed threads will enter into infinite
wait as the entry
+ //is removed. Need to add support for graceful remove
(placeholder) and resume.
+ if (size > LineageCacheEviction.getCacheLimit()) {
+ _cache.remove(item);
+ return;
+ }
- //make space for the data
- if (!LineageCacheEviction.isBelowThreshold(size))
- LineageCacheEviction.makeSpace(_cache, size);
- LineageCacheEviction.updateSize(size, true);
+ //make space for the data
+ if (!LineageCacheEviction.isBelowThreshold(size))
+ LineageCacheEviction.makeSpace(_cache, size);
+ LineageCacheEviction.updateSize(size, true);
- //place the data
- if (data instanceof MatrixObject)
- entry.setValue(mb, computetime);
- else if (data instanceof ScalarObject)
- entry.setValue((ScalarObject)data, computetime);
+ //place the data
+ if (data instanceof MatrixObject)
+ entry.setValue(mb, computetime);
+ else if (data instanceof ScalarObject)
+ entry.setValue((ScalarObject)data, computetime);
- //TODO: maintain statistics, lineage estimate
+ //TODO: maintain statistics, lineage estimate
- //maintain order for eviction
- LineageCacheEviction.addEntry(entry);
+ //maintain order for eviction
+ LineageCacheEviction.addEntry(entry);
+ }
}
public static void resetCache() {
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
index 73e0532..dba5e93 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeApplyTest.java
@@ -214,7 +214,6 @@ public class TransformFederatedEncodeApplyTest extends
AutomatedTestBase {
int port3 = getRandomAvailablePort();
int port4 = getRandomAvailablePort();
String[] otherargs = lineage ? new String[]
{"-lineage", "reuse_full"} : null;
- Lineage.resetInternalState();
t1 = startLocalFedWorkerThread(port1, otherargs,
FED_WORKER_WAIT_S);
t2 = startLocalFedWorkerThread(port2, otherargs,
FED_WORKER_WAIT_S);
t3 = startLocalFedWorkerThread(port3, otherargs,
FED_WORKER_WAIT_S);
@@ -265,8 +264,10 @@ public class TransformFederatedEncodeApplyTest extends
AutomatedTestBase {
"cols=" + dataset.getNumColumns(), "TFSPEC=" +
HOME + "input/" + SPEC, "TFDATA1=" + output("tfout1"),
"TFDATA2=" + output("tfout2"), "OFMT=csv"};
- if (lineage)
+ if (lineage) {
+ Lineage.resetInternalState();
programArgs = (String[])
ArrayUtils.addAll(lineageArgs, programArgs);
+ }
runTest(true, false, null, -1);