This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cc5239  [SYSTEMDS-3185] Multi-tenant federated workers (variable 
isolation)
5cc5239 is described below

commit 5cc523971854cdf4f22e6199987a86e213fae4e2
Author: ywcb00 <[email protected]>
AuthorDate: Thu Dec 16 23:32:22 2021 +0100

    [SYSTEMDS-3185] Multi-tenant federated workers (variable isolation)
    
    Closes #1421.
---
 .../federated/ExecutionContextMap.java             |  11 +
 .../federated/FederatedLocalData.java              |   9 +-
 .../federated/FederatedLookupTable.java            | 132 +++++++
 .../controlprogram/federated/FederatedRequest.java |   7 +
 .../controlprogram/federated/FederatedWorker.java  |   6 +-
 .../federated/FederatedWorkerHandler.java          |  91 +++--
 .../controlprogram/parfor/util/IDHandler.java      |  69 ++--
 src/test/config/SystemDS-MultiTenant-config.xml    |  23 ++
 .../org/apache/sysds/test/AutomatedTestBase.java   |   2 +-
 .../multitenant/FederatedMultiTenantTest.java      | 404 +++++++++++++++++++++
 .../test/functions/lineage/LineageFedReuseAlg.java |  30 +-
 .../multitenant/FederatedMultiTenantTest.dml       |  63 ++++
 12 files changed, 758 insertions(+), 89 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
index ef4f6d6..21b6541 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
@@ -82,4 +82,15 @@ public class ExecutionContextMap {
                ec2.setAutoCreateVars(true); //w/o createvar inst
                return ec2;
        }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder();
+               sb.append(super.toString());
+               sb.append("\nMain EC: ");
+               sb.append(_main.toString());
+               sb.append("ParFor ECs: ");
+               sb.append(_parEc.toString());
+               return sb.toString();
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLocalData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLocalData.java
index 1589dc3..b1a4c6d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLocalData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLocalData.java
@@ -24,18 +24,21 @@ import java.util.concurrent.Future;
 
 import org.apache.log4j.Logger;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
 
 public class FederatedLocalData extends FederatedData {
        protected final static Logger log = 
Logger.getLogger(FederatedWorkerHandler.class);
 
-       private static final ExecutionContextMap ecm = new 
ExecutionContextMap();
-       private static final FederatedWorkerHandler fwh = new 
FederatedWorkerHandler(ecm);
+       private static final FederatedLookupTable _flt = new 
FederatedLookupTable();
+       private static final FederatedWorkerHandler _fwh = new 
FederatedWorkerHandler(_flt);
 
        private final CacheableData<?> _data;
 
        public FederatedLocalData(long id, CacheableData<?> data) {
                super(data.getDataType(), null, data.getFileName());
                _data = data;
+               long pid = Long.valueOf(IDHandler.obtainProcessID());
+               ExecutionContextMap ecm = 
_flt.getECM(FederatedLookupTable.NOHOST, pid);
                synchronized(ecm) {
                        ecm.get(-1).setVariable(Long.toString(id), _data);
                }
@@ -54,6 +57,6 @@ public class FederatedLocalData extends FederatedData {
 
        @Override
        public synchronized Future<FederatedResponse> 
executeFederatedOperation(FederatedRequest... request) {
-               return 
CompletableFuture.completedFuture(fwh.createResponse(request));
+               return 
CompletableFuture.completedFuture(_fwh.createResponse(request));
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLookupTable.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLookupTable.java
new file mode 100644
index 0000000..905ab57
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedLookupTable.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Lookup table mapping from a FedUniqueCoordID (funCID) to an
+ * ExecutionContextMap (ECM) so that every coordinator can address federated
+ * variables with its own local sequential variable IDs. Therefore, the IDs
+ * among different coordinators do not have to be distinct, as every
+ * coordinator works with a seperate ECM at the FederatedWorker.
+ */
+public class FederatedLookupTable {
+       // the NOHOST constant is needed for creating FederatedLocalData where 
there
+       // is no actual network connection (and hence no host either)
+       public static final String NOHOST = "nohost";
+
+       protected static Logger log = 
Logger.getLogger(FederatedLookupTable.class);
+
+       // stores the mapping between the funCID and the corresponding 
ExecutionContextMap
+       private final Map<FedUniqueCoordID, ExecutionContextMap> _lookup_table;
+
+       public FederatedLookupTable() {
+               _lookup_table = new ConcurrentHashMap<>();
+       }
+
+       /**
+        * Get the ExecutionContextMap corresponding to the given host and pid 
of the
+        * requesting coordinator from the lookup table. Create a new
+        * ExecutionContextMap if there is no corresponding entry in the lookup 
table.
+        *
+        * @param host the host string of the requesting coordinator (usually 
IP address)
+        * @param pid the process id of the requesting coordinator
+        * @return ExecutionContextMap the ECM corresponding to the requesting 
coordinator
+        */
+       public ExecutionContextMap getECM(String host, long pid) {
+               log.trace("Getting the ExecutionContextMap for coordinator " + 
pid + "@" + host);
+               FedUniqueCoordID funCID = new FedUniqueCoordID(host, pid);
+               ExecutionContextMap ecm = _lookup_table.computeIfAbsent(funCID,
+                       k -> new ExecutionContextMap());
+               if(ecm == null) {
+                       log.error("Computing federated execution context map 
failed. "
+                               + "No valid resolution for " + 
funCID.toString() + " found.");
+                       throw new FederatedWorkerHandlerException("Computing 
federated execution context map failed. "
+                               + "No valid resolution for " + 
funCID.toString() + " found.");
+               }
+               return ecm;
+       }
+
+       /**
+        * Check if there is a mapped ExecutionContextMap for the coordinator
+        * with the given host and pid.
+        *
+        * @param host the host string of the requesting coordinator (usually 
IP address)
+        * @param pid the process id of the requesting coordinator
+        * @return boolean true if there is a lookup table entry, otherwise 
false
+        */
+       public boolean containsFunCID(String host, long pid) {
+               FedUniqueCoordID funCID = new FedUniqueCoordID(host, pid);
+               return _lookup_table.containsKey(funCID);
+       }
+
+       @Override
+       public String toString() {
+               return _lookup_table.toString();
+       }
+
+
+       /**
+        * Class to collect the information needed to identify a specific 
coordinator.
+        */
+       private static class FedUniqueCoordID {
+               private final String _host;
+               private final long _pid;
+
+               public FedUniqueCoordID(String host, long pid) {
+                       _host = host;
+                       _pid = pid;
+               }
+
+               @Override
+               public final boolean equals(Object obj) {
+                       if(this == obj)
+                               return true;
+                       if(obj == null)
+                               return false;
+                       if(!(obj instanceof FedUniqueCoordID))
+                               return false;
+
+                       FedUniqueCoordID funCID = (FedUniqueCoordID) obj;
+
+                       return Objects.equals(_host, funCID._host)
+                               && (_pid == funCID._pid);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(_host, _pid);
+               }
+
+               @Override
+               public String toString() {
+                       StringBuilder sb = new StringBuilder();
+                       sb.append(_pid);
+                       sb.append("@");
+                       sb.append(_host);
+                       return sb.toString();
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index fd98456..ad9a711 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -33,6 +33,7 @@ import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheDataOutput;
 import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.utils.Statistics;
 
@@ -56,6 +57,7 @@ public class FederatedRequest implements Serializable {
        private List<Object> _data;
        private boolean _checkPrivacy;
        private List<Long> _checksums;
+       private long _pid;
 
        public FederatedRequest(RequestType method) {
                this(method, FederationUtils.getNextFedDataID(), new 
ArrayList<>());
@@ -74,6 +76,7 @@ public class FederatedRequest implements Serializable {
                _method = method;
                _id = id;
                _data = data;
+               _pid = Long.valueOf(IDHandler.obtainProcessID());
                setCheckPrivacy();
                if (DMLScript.LINEAGE && method == RequestType.PUT_VAR)
                        setChecksum();
@@ -95,6 +98,10 @@ public class FederatedRequest implements Serializable {
                _tid = tid;
        }
 
+       public long getPID() {
+               return _pid;
+       }
+
        public Object getParam(int i) {
                return _data.get(i);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index 05414b4..d324ee6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -46,10 +46,10 @@ public class FederatedWorker {
        protected static Logger log = Logger.getLogger(FederatedWorker.class);
 
        private int _port;
-       private final ExecutionContextMap _ecm;
+       private final FederatedLookupTable _flt;
 
        public FederatedWorker(int port) {
-               _ecm = new ExecutionContextMap();
+               _flt = new FederatedLookupTable();
                _port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port;
        }
 
@@ -77,7 +77,7 @@ public class FederatedWorker {
                                                        new 
ObjectDecoder(Integer.MAX_VALUE,
                                                                
ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())));
                                                cp.addLast("ObjectEncoder", new 
ObjectEncoder());
-                                               
cp.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_ecm));
+                                               
cp.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_flt));
                                        }
                                }).option(ChannelOption.SO_BACKLOG, 
128).childOption(ChannelOption.SO_KEEPALIVE, true);
                        log.info("Starting Federated Worker server at port: " + 
_port);
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 1d0eb79..28f2932 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -21,6 +21,8 @@ package org.apache.sysds.runtime.controlprogram.federated;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.Arrays;
 
 import io.netty.channel.ChannelFuture;
@@ -73,7 +75,7 @@ import org.apache.sysds.utils.Statistics;
 public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
        private static final Logger LOG = 
Logger.getLogger(FederatedWorkerHandler.class);
 
-       private final ExecutionContextMap _ecm;
+       private final FederatedLookupTable _flt;
 
        /**
         * Create a Federated Worker Handler.
@@ -81,24 +83,46 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
         * Note: federated worker handler created for every command; and 
concurrent parfor threads at coordinator need
         * separate execution contexts at the federated sites too
         * 
-        * @param ecm A execution context, used to map variables and execution.
+        * @param flt The Federated Lookup Table of the current Federated 
Worker.
         */
-       public FederatedWorkerHandler(ExecutionContextMap ecm) {
-               _ecm = ecm;
+       public FederatedWorkerHandler(FederatedLookupTable flt) {
+               _flt = flt;
        }
 
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
-               ctx.writeAndFlush(createResponse(msg)).addListener(new 
CloseListener());
+               ctx.writeAndFlush(createResponse(msg, 
ctx.channel().remoteAddress()))
+                       .addListener(new CloseListener());
        }
 
        protected FederatedResponse createResponse(Object msg) {
+               return createResponse(msg, FederatedLookupTable.NOHOST);
+       }
+
+       private FederatedResponse createResponse(Object msg, SocketAddress 
remoteAddress) {
+               String host;
+               if(remoteAddress instanceof InetSocketAddress) {
+                       host = ((InetSocketAddress) 
remoteAddress).getHostString();
+               }
+               else if(remoteAddress instanceof SocketAddress) {
+                       host = 
remoteAddress.toString().split(":")[0].split("/")[1];
+               }
+               else {
+                       LOG.warn("Given remote address of coordinator is null. 
Continuing with "
+                               + FederatedLookupTable.NOHOST + " as host 
identifier.");
+                       host = FederatedLookupTable.NOHOST;
+               }
+
+               return createResponse(msg, host);
+       }
+
+       private FederatedResponse createResponse(Object msg, String remoteHost) 
{
                if(!(msg instanceof FederatedRequest[]))
                        return new FederatedResponse(ResponseType.ERROR,
                                new FederatedWorkerHandlerException("Received 
object of wrong instance 'FederatedRequest[]'."));
                final FederatedRequest[] requests = (FederatedRequest[]) msg;
                try {
-                       return createResponse(requests);
+                       return createResponse(requests, remoteHost);
                }
                catch(DMLPrivacyException | FederatedWorkerHandlerException ex) 
{
                        // Here we control the error message, therefore it is 
allowed to send the stack trace with the response
@@ -112,20 +136,21 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                }
        }
 
-       private FederatedResponse createResponse(FederatedRequest[] requests)
+       private FederatedResponse createResponse(FederatedRequest[] requests, 
String remoteHost)
                throws DMLPrivacyException, FederatedWorkerHandlerException, 
Exception {
                FederatedResponse response = null; // last response
                boolean containsCLEAR = false;
                for(int i = 0; i < requests.length; i++) {
                        final FederatedRequest request = requests[i];
                        final RequestType t = request.getType();
+                       ExecutionContextMap ecm = _flt.getECM(remoteHost, 
request.getPID());
                        logRequests(request, i, requests.length);
 
                        PrivacyMonitor.setCheckPrivacy(request.checkPrivacy());
                        PrivacyMonitor.clearCheckedConstraints();
 
                        // execute command and handle privacy constraints
-                       final FederatedResponse tmp = executeCommand(request);
+                       final FederatedResponse tmp = executeCommand(request, 
ecm);
                        conditionalAddCheckedConstraints(request, tmp);
 
                        // select the response
@@ -176,22 +201,22 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                        
response.setCheckedConstraints(PrivacyMonitor.getCheckedConstraints());
        }
 
-       private FederatedResponse executeCommand(FederatedRequest request)
+       private FederatedResponse executeCommand(FederatedRequest request, 
ExecutionContextMap ecm)
                throws DMLPrivacyException, FederatedWorkerHandlerException, 
Exception {
                final RequestType method = request.getType();
                switch(method) {
                        case READ_VAR:
-                               return readData(request); // matrix/frame
+                               return readData(request, ecm); // matrix/frame
                        case PUT_VAR:
-                               return putVariable(request);
+                               return putVariable(request, ecm);
                        case GET_VAR:
-                               return getVariable(request);
+                               return getVariable(request, ecm);
                        case EXEC_INST:
-                               return execInstruction(request);
+                               return execInstruction(request, ecm);
                        case EXEC_UDF:
-                               return execUDF(request);
+                               return execUDF(request, ecm);
                        case CLEAR:
-                               return execClear();
+                               return execClear(ecm);
                        case NOOP:
                                return execNoop();
                        default:
@@ -200,14 +225,15 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                }
        }
 
-       private FederatedResponse readData(FederatedRequest request) {
+       private FederatedResponse readData(FederatedRequest request, 
ExecutionContextMap ecm) {
                checkNumParams(request.getNumParams(), 2);
                String filename = (String) request.getParam(0);
                DataType dt = DataType.valueOf((String) request.getParam(1));
-               return readData(filename, dt, request.getID(), 
request.getTID());
+               return readData(filename, dt, request.getID(), 
request.getTID(), ecm);
        }
 
-       private FederatedResponse readData(String filename, Types.DataType 
dataType, long id, long tid) {
+       private FederatedResponse readData(String filename, Types.DataType 
dataType,
+               long id, long tid, ExecutionContextMap ecm) {
                MatrixCharacteristics mc = new MatrixCharacteristics();
                mc.setBlocksize(ConfigurationManager.getBlocksize());
                CacheableData<?> cd;
@@ -260,11 +286,11 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                if(fmt == FileFormat.CSV)
                        cd.setFileFormatProperties(new 
FileFormatPropertiesCSV(header, delim, DataExpression.DEFAULT_DELIM_SPARSE));
                cd.enableCleanup(false); // guard against deletion
-               _ecm.get(tid).setVariable(String.valueOf(id), cd);
+               ecm.get(tid).setVariable(String.valueOf(id), cd);
 
                if(DMLScript.LINEAGE)
                        // create a literal type lineage item with the file name
-                       _ecm.get(tid).getLineage().set(String.valueOf(id), new 
LineageItem(filename));
+                       ecm.get(tid).getLineage().set(String.valueOf(id), new 
LineageItem(filename));
 
                if(dataType == Types.DataType.FRAME) {
                        FrameObject frameObject = (FrameObject) cd;
@@ -276,10 +302,10 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                return new FederatedResponse(ResponseType.SUCCESS, new Object[] 
{id, mc});
        }
 
-       private FederatedResponse putVariable(FederatedRequest request) {
+       private FederatedResponse putVariable(FederatedRequest request, 
ExecutionContextMap ecm) {
                checkNumParams(request.getNumParams(), 1, 2);
                final String varName = String.valueOf(request.getID());
-               ExecutionContext ec = _ecm.get(request.getTID());
+               ExecutionContext ec = ecm.get(request.getTID());
 
                if(ec.containsVariable(varName)) {
                        Data tgtData = ec.removeVariable(varName);
@@ -312,9 +338,9 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
        }
 
-       private FederatedResponse getVariable(FederatedRequest request) {
+       private FederatedResponse getVariable(FederatedRequest request, 
ExecutionContextMap ecm) {
                checkNumParams(request.getNumParams(), 0);
-               ExecutionContext ec = _ecm.get(request.getTID());
+               ExecutionContext ec = ecm.get(request.getTID());
                if(!ec.containsVariable(String.valueOf(request.getID())))
                        throw new FederatedWorkerHandlerException(
                                "Variable " + request.getID() + " does not 
exist at federated worker.");
@@ -336,17 +362,18 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                }
        }
 
-       private FederatedResponse execInstruction(FederatedRequest request) 
throws Exception {
-               ExecutionContext ec = _ecm.get(request.getTID());
+       private FederatedResponse execInstruction(FederatedRequest request, 
ExecutionContextMap ecm) throws Exception {
+               ExecutionContext ec = ecm.get(request.getTID());
                
                //handle missing spark execution context
                //TODO handling of spark instructions should be under control 
of federated site not coordinator
                Instruction receivedInstruction = 
InstructionParser.parseSingleInstruction((String) request.getParam(0));
                if(receivedInstruction.getType() == IType.SPARK
                        && !(ec instanceof SparkExecutionContext) ) {
-                       _ecm.convertToSparkCtx();
-                       ec = _ecm.get(request.getTID());
+                       ecm.convertToSparkCtx();
+                       ec = ecm.get(request.getTID());
                }
+
                BasicProgramBlock pb = new BasicProgramBlock(null);
                pb.getInstructions().clear();
                pb.getInstructions().add(receivedInstruction);
@@ -363,9 +390,9 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
        }
 
-       private FederatedResponse execUDF(FederatedRequest request) {
+       private FederatedResponse execUDF(FederatedRequest request, 
ExecutionContextMap ecm) {
                checkNumParams(request.getNumParams(), 1);
-               ExecutionContext ec = _ecm.get(request.getTID());
+               ExecutionContext ec = ecm.get(request.getTID());
 
                // get function and input parameters
                FederatedUDF udf = (FederatedUDF) request.getParam(0);
@@ -401,9 +428,9 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                }
        }
 
-       private FederatedResponse execClear() {
+       private FederatedResponse execClear(ExecutionContextMap ecm) {
                try {
-                       _ecm.clear();
+                       ecm.clear();
                }
                catch(DMLPrivacyException | FederatedWorkerHandlerException ex) 
{
                        throw ex;
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
index 0f9b7ef..f863562 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
@@ -33,24 +33,21 @@ import java.net.InetAddress;
  */
 public class IDHandler 
 {
-       public static int extractIntID( String taskID )
-       {
+       public static int extractIntID( String taskID ) {
                int maxlen = (int)(Math.log10(Integer.MAX_VALUE));
                int intVal = (int)extractID( taskID, maxlen );
-               return intVal;          
-               
+               return intVal;
        }
 
-       public static long concatIntIDsToLong( int part1, int part2 )
-       {
+       public static long concatIntIDsToLong( int part1, int part2 ) {
                //big-endian version (in java uses only big endian)
                long value = ((long)part1) << 32; //unsigned shift of part1 to 
first 4bytes
                value = value | part2;            //bitwise OR with part2 
(second 4bytes)
-               
+
                //*-endian version 
                //long value = ((long)part1)*(long)Math.pow(2, 32);
                //value += part2;
-               
+
                return value;
        }
 
@@ -61,56 +58,59 @@ public class IDHandler
         * @param part if part is 1, use first 4 bytes. if part is 2, use 
second 4 bytes!
         * @return return int id, or -1 if part is not 1 or 2!
         */
-       public static int extractIntIDFromLong( long val, int part )
-       {
+       public static int extractIntIDFromLong( long val, int part ) {
                int ret = -1;
                if( part == 1 )
                        ret = (int)(val >>> 32);
                else if( part == 2 )
                        ret = (int)val; 
-                               
+
                return ret;
        }
-       
+
        /**
         * Creates a unique identifier with the pattern 
&lt;process_id&gt;_&lt;host_ip&gt;.
         * 
         * @return distributed unique id
         */
-       public static String createDistributedUniqueID() 
-       {
+       public static String createDistributedUniqueID() {
                String uuid = null;
-               
-               try
-               {
-                       //get process id                 
-                   String pname = 
ManagementFactory.getRuntimeMXBean().getName(); //pid@hostname
-                   String pid = pname.split("@")[0];
-                   
-                   //get ip address
-                   InetAddress addr = InetAddress.getLocalHost();
-                   String host = addr.getHostAddress();
-                       
-                   uuid = pid + "_" + host;
+
+               try {
+                       String pid = obtainProcessID();
+
+                       //get ip address
+                       InetAddress addr = InetAddress.getLocalHost();
+                       String host = addr.getHostAddress();
+
+                       uuid = pid + "_" + host;
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        uuid = "0_0.0.0.0";
                }
-               
+
                return uuid;
        }
 
-       private static long extractID( String taskID, int maxlen )
-       {
+       public static String obtainProcessID() {
+               //get process id
+               String pname = ManagementFactory.getRuntimeMXBean().getName(); 
//pid@hostname
+               String pid = pname.split("@")[0];
+               // TODO: change this as soon as we switch to a java version >= 9
+               // import java.lang.ProcessHandle;
+               // pid = ProcessHandle.current().pid();
+               return pid;
+       }
+
+       private static long extractID( String taskID, int maxlen ) {
                //in: e.g., task_local_0002_m_000009 or 
task_201203111647_0898_m_000001
                //out: e.g., 2000009
-               
+
                //generic parsing for flexible taskID formats
                char[] c = taskID.toCharArray(); //all chars
                long value = 0; //1 catch leading zeros as well         
                int count = 0;
-               
+
                for( int i=c.length-1; i >= 0 && count<maxlen; i-- ) //start at 
end
                {
                        if( c[i] >= 48 && c[i]<=57 )  //'0'-'9'
@@ -122,8 +122,7 @@ public class IDHandler
                                count++;
                        }
                }
-               
+
                return value;
        }
-       
 }
diff --git a/src/test/config/SystemDS-MultiTenant-config.xml 
b/src/test/config/SystemDS-MultiTenant-config.xml
new file mode 100644
index 0000000..3e250cc
--- /dev/null
+++ b/src/test/config/SystemDS-MultiTenant-config.xml
@@ -0,0 +1,23 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+   <!-- The timeout of the federated tests to initialize the federated 
matrixes -->
+   
<sysds.federated.initialization.timeout>30</sysds.federated.initialization.timeout>
+</root>
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java 
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 41ba0f5..0243f2a 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -140,7 +140,7 @@ public abstract class AutomatedTestBase {
        private static final String DEBUG_TEMP_DIR = "./tmp/";
 
        /** Directory under which config files shared across tests are located. 
*/
-       private static final String CONFIG_DIR = "./src/test/config/";
+       protected static final String CONFIG_DIR = "./src/test/config/";
 
        /**
         * Location of the SystemDS config file that we use as a template when 
generating the configs for each test case.
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
new file mode 100644
index 0000000..eade66c
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/multitenant/FederatedMultiTenantTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.multitenant;
+
+import java.io.IOException;
+import java.lang.Math;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+
+import static org.junit.Assert.fail;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedMultiTenantTest extends AutomatedTestBase {
+       private final static String TEST_NAME = "FederatedMultiTenantTest";
+
+       private final static String TEST_DIR = 
"functions/federated/multitenant/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FederatedMultiTenantTest.class.getSimpleName() + "/";
+
+       private final static double TOLERANCE = 0;
+
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows;
+       @Parameterized.Parameter(1)
+       public int cols;
+       @Parameterized.Parameter(2)
+       public boolean rowPartitioned;
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(
+                       new Object[][] {
+                               {100, 1000, false},
+                               // {1000, 100, true},
+               });
+       }
+
+       private ArrayList<Process> workerProcesses = new ArrayList<>();
+       private ArrayList<Process> coordinatorProcesses = new ArrayList<>();
+
+       private enum OpType {
+               SUM,
+               PARFOR_SUM,
+               WSIGMOID,
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S"}));
+       }
+
+       @Test
+       public void testSumSameWorkersCP() {
+               runMultiTenantSameWorkerTest(OpType.SUM, 4, 
ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       @Ignore
+       public void testSumSharedWorkersCP() {
+               runMultiTenantSharedWorkerTest(OpType.SUM, 3, 9, 
ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       @Ignore
+       public void testSumSameWorkersSP() {
+               runMultiTenantSameWorkerTest(OpType.SUM, 4, ExecMode.SPARK);
+       }
+
+//FIXME still runs into blocking
+//     @Test
+//     public void testSumSharedWorkersSP() {
+//             runMultiTenantSharedWorkerTest(OpType.SUM, 3, 9, 
ExecMode.SPARK);
+//     }
+
+       @Test
+       @Ignore
+       public void testParforSumSameWorkersCP() {
+               runMultiTenantSameWorkerTest(OpType.PARFOR_SUM, 4, 
ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       public void testParforSumSharedWorkersCP() {
+               runMultiTenantSharedWorkerTest(OpType.PARFOR_SUM, 3, 9, 
ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       public void testParforSumSameWorkersSP() {
+               runMultiTenantSameWorkerTest(OpType.PARFOR_SUM, 4, 
ExecMode.SPARK);
+       }
+
+       @Test
+       @Ignore
+       public void testParforSumSharedWorkersSP() {
+               runMultiTenantSharedWorkerTest(OpType.PARFOR_SUM, 3, 9, 
ExecMode.SPARK);
+       }
+
+       @Test
+       public void testWSigmoidSameWorkersCP() {
+               runMultiTenantSameWorkerTest(OpType.WSIGMOID, 4, 
ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       @Ignore
+       public void testWSigmoidSharedWorkersCP() {
+               runMultiTenantSharedWorkerTest(OpType.WSIGMOID, 3, 9, 
ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       @Ignore
+       public void testWSigmoidSameWorkersSP() {
+               runMultiTenantSameWorkerTest(OpType.WSIGMOID, 4, 
ExecMode.SPARK);
+       }
+
+       @Test
+       public void testWSigmoidSharedWorkersSP() {
+               runMultiTenantSharedWorkerTest(OpType.WSIGMOID, 3, 9, 
ExecMode.SPARK);
+       }
+
+       // ensure that the processes are killed - even if the test throws an 
exception
+       @After
+       public void stopAllProcesses() {
+               for(Process p : coordinatorProcesses)
+                       p.destroyForcibly();
+               for(Process p : workerProcesses)
+                       p.destroyForcibly();
+       }
+
+       private void runMultiTenantSameWorkerTest(OpType opType, int 
numCoordinators, ExecMode execMode) {
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               ExecMode platformOld = rtplatform;
+
+               if(rtplatform == ExecMode.SPARK)
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               // write input matrices
+               int r = rows;
+               int c = cols / 4;
+               if(rowPartitioned) {
+                       r = rows / 4;
+                       c = cols;
+               }
+
+               double[][] X1 = getRandomMatrix(r, c, 0, 3, 1, 3);
+               double[][] X2 = getRandomMatrix(r, c, 0, 3, 1, 7);
+               double[][] X3 = getRandomMatrix(r, c, 0, 3, 1, 8);
+               double[][] X4 = getRandomMatrix(r, c, 0, 3, 1, 9);
+
+               MatrixCharacteristics mc = new MatrixCharacteristics(r, c, 
blocksize, r * c);
+               writeInputMatrixWithMTD("X1", X1, false, mc);
+               writeInputMatrixWithMTD("X2", X2, false, mc);
+               writeInputMatrixWithMTD("X3", X3, false, mc);
+               writeInputMatrixWithMTD("X4", X4, false, mc);
+
+               // empty script name because we don't execute any script, just 
start the worker
+               fullDMLScriptName = "";
+
+               int[] workerPorts = startFedWorkers(4);
+
+               rtplatform = execMode;
+               if(rtplatform == ExecMode.SPARK) {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               }
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+
+               // start the coordinator processes
+               String scriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-stats", "100", "-fedStats", 
"100", "-nvargs",
+                       "in_X1=" + TestUtils.federatedAddress(workerPorts[0], 
input("X1")),
+                       "in_X2=" + TestUtils.federatedAddress(workerPorts[1], 
input("X2")),
+                       "in_X3=" + TestUtils.federatedAddress(workerPorts[2], 
input("X3")),
+                       "in_X4=" + TestUtils.federatedAddress(workerPorts[3], 
input("X4")),
+                       "rows=" + rows, "cols=" + cols, "testnum=" + 
Integer.toString(opType.ordinal()),
+                       "rP=" + Boolean.toString(rowPartitioned).toUpperCase()};
+               for(int counter = 0; counter < numCoordinators; counter++)
+                       coordinatorProcesses.add(startCoordinator(execMode, 
scriptName,
+                               ArrayUtils.addAll(programArgs, "out_S=" + 
output("S" + counter))));
+
+               joinCoordinatorsAndVerify(opType, execMode);
+
+               // check that federated input files are still existing
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+               TestUtils.shutdownThreads(workerProcesses.toArray(new 
Process[0]));
+
+               rtplatform = platformOld;
+               DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+       }
+
+       private void runMultiTenantSharedWorkerTest(OpType opType, int 
numCoordinators, int maxNumWorkers, ExecMode execMode) {
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               ExecMode platformOld = rtplatform;
+
+               if(rtplatform == ExecMode.SPARK)
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               final int numPartitions = 4;
+               final int numSharedWorkers = numPartitions - 
(int)Math.floor(maxNumWorkers / numCoordinators);
+               final int numFedWorkers = (numCoordinators * (numPartitions - 
numSharedWorkers)) + numSharedWorkers;
+
+               // write input matrices
+               int r = rows;
+               int c = cols / 4;
+               if(rowPartitioned) {
+                       r = rows / 4;
+                       c = cols;
+               }
+
+               double[][] X1 = getRandomMatrix(r, c, 0, 3, 1, 3);
+               double[][] X2 = getRandomMatrix(r, c, 0, 3, 1, 7);
+               double[][] X3 = getRandomMatrix(r, c, 0, 3, 1, 8);
+               double[][] X4 = getRandomMatrix(r, c, 0, 3, 1, 9);
+
+               MatrixCharacteristics mc = new MatrixCharacteristics(r, c, 
blocksize, r * c);
+               writeInputMatrixWithMTD("X1", X1, false, mc);
+               writeInputMatrixWithMTD("X2", X2, false, mc);
+               writeInputMatrixWithMTD("X3", X3, false, mc);
+               writeInputMatrixWithMTD("X4", X4, false, mc);
+
+               // empty script name because we don't execute any script, just 
start the worker
+               fullDMLScriptName = "";
+
+               int[] workerPorts = startFedWorkers(numFedWorkers);
+
+               rtplatform = execMode;
+               if(rtplatform == ExecMode.SPARK) {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               }
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+
+               // start the coordinator processes
+               final String scriptName = HOME + TEST_NAME + ".dml";
+               for(int counter = 0; counter < numCoordinators; counter++) {
+                       int workerIndexOffset = (numPartitions - 
numSharedWorkers) * counter;
+                       programArgs = new String[] {"-config", CONFIG_DIR + 
"SystemDS-MultiTenant-config.xml",
+                               "-stats", "100", "-fedStats", "100", "-nvargs",
+                               "in_X1=" + 
TestUtils.federatedAddress(workerPorts[workerIndexOffset], input("X1")),
+                               "in_X2=" + 
TestUtils.federatedAddress(workerPorts[workerIndexOffset + 1], input("X2")),
+                               "in_X3=" + 
TestUtils.federatedAddress(workerPorts[workerIndexOffset + 2], input("X3")),
+                               "in_X4=" + 
TestUtils.federatedAddress(workerPorts[workerIndexOffset + 3], input("X4")),
+                               "rows=" + rows, "cols=" + cols, "testnum=" + 
Integer.toString(opType.ordinal()),
+                               "rP=" + 
Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S" + 
counter)};
+                       coordinatorProcesses.add(startCoordinator(execMode, 
scriptName, programArgs));
+               }
+
+               joinCoordinatorsAndVerify(opType, execMode);
+
+               // check that federated input files are still existing
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+               TestUtils.shutdownThreads(workerProcesses.toArray(new 
Process[0]));
+
+               rtplatform = platformOld;
+               DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+       }
+
+       private int[] startFedWorkers(int numFedWorkers) {
+               int[] ports = new int[numFedWorkers];
+               for(int counter = 0; counter < numFedWorkers; counter++) {
+                       ports[counter] = getRandomAvailablePort();
+                       @SuppressWarnings("deprecation")
+                       Process tmpProcess = 
startLocalFedWorker(ports[counter]);
+                       workerProcesses.add(tmpProcess);
+               }
+               return ports;
+       }
+
+       private Process startCoordinator(ExecMode execMode, String scriptPath, 
String[] args) {
+               String separator = System.getProperty("file.separator");
+               String classpath = System.getProperty("java.class.path");
+               String path = System.getProperty("java.home") + separator + 
"bin" + separator + "java";
+
+               String em = null;
+               switch(execMode) {
+                       case SINGLE_NODE:
+                       em = "singlenode";
+                       break;
+                       case HYBRID:
+                       em = "hybrid";
+                       break;
+                       case SPARK:
+                       em = "spark";
+                       break;
+               }
+
+               ArrayList<String> argsList = new ArrayList<>();
+               argsList.add("-f");
+               argsList.add(scriptPath);
+               argsList.add("-exec");
+               argsList.add(em);
+               argsList.addAll(Arrays.asList(args));
+
+               ProcessBuilder processBuilder = new 
ProcessBuilder(ArrayUtils.addAll(new String[]{
+                       path, "-cp", classpath, DMLScript.class.getName()}, 
argsList.toArray(new String[0])))
+                       .redirectErrorStream(true);
+               
+               Process process = null;
+               try {
+                       process = processBuilder.start();
+               } catch(IOException ioe) {
+                       ioe.printStackTrace();
+               }
+               
+               return process;
+       }
+
+       private void joinCoordinatorsAndVerify(OpType opType, ExecMode 
execMode) {
+               // join the coordinator processes
+               for(int counter = 0; counter < coordinatorProcesses.size(); 
counter++) {
+                       Process coord = coordinatorProcesses.get(counter);
+                       
+                       //wait for process, but obtain logs before to avoid 
blocking
+                       String outputLog = null, errorLog = null;
+                       try {
+                               outputLog = 
IOUtils.toString(coord.getInputStream());
+                               errorLog = 
IOUtils.toString(coord.getErrorStream());
+                               
+                               coord.waitFor();
+                       }
+                       catch(Exception ex) {
+                               ex.printStackTrace();
+                       }
+                       
+                       // get and print the output
+                       System.out.println("Output of coordinator #" + 
Integer.toString(counter + 1) + ":\n");
+                       System.out.println(outputLog);
+                       System.out.println(errorLog);
+                       Assert.assertTrue(checkForHeavyHitter(opType, 
outputLog, execMode));
+               }
+
+               // compare the results via files
+               HashMap<CellIndex, Double> refResults = 
readDMLMatrixFromOutputDir("S" + 0);
+               if(refResults.isEmpty())
+                       fail("The result of the first coordinator, which is 
taken as reference, is empty.");
+               for(int counter = 1; counter < coordinatorProcesses.size(); 
counter++) {
+                       HashMap<CellIndex, Double> fedResults = 
readDMLMatrixFromOutputDir("S" + counter);
+                       TestUtils.compareMatrices(fedResults, refResults, 
TOLERANCE, "Fed" + counter, "FedRef");
+               }
+       }
+
+       private static boolean checkForHeavyHitter(OpType opType, String 
outputLog, ExecMode execMode) {
+               switch(opType) {
+                       case SUM:
+                               return outputLog.contains("fed_uak+");
+                       case PARFOR_SUM:
+                               return outputLog.contains(execMode == 
ExecMode.SPARK ? "fed_rblk" : "fed_uak+");
+                       case WSIGMOID:
+                               return outputLog.contains("fed_wsigmoid");
+                       default:
+                               return false;
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java
index 257848d..e38a4fd 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java
@@ -98,23 +98,9 @@ public class LineageFedReuseAlg extends AutomatedTestBase {
                        TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
                        loadTestConfiguration(config);
 
-                       // Run with federated matrix and without reuse
-                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
-                       programArgs = new String[] {"-stats", "20", 
-                               "-nvargs", "in_X1=" + 
TestUtils.federatedAddress(port1, input("X1")),
-                               "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
-                               "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
-                               "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")), "rows=" + rows, "cols=" + (cols + 1),
-                               "in_Y=" + input("Y"), "cont=" + 
String.valueOf(contSplits).toUpperCase(), "out=" + expected("Z")};
-                       runTest(true, false, null, -1);
-                       long tsmmCount = 
Statistics.getCPHeavyHitterCount("tsmm");
-                       long fed_tsmmCount = 
Statistics.getCPHeavyHitterCount("fed_tsmm");
-                       long mmCount = Statistics.getCPHeavyHitterCount("ba+*");
-                       long fed_mmCount = 
Statistics.getCPHeavyHitterCount("fed_ba+*");
-
                        // Run with federated matrix and with reuse
                        fullDMLScriptName = HOME + TEST_NAME + ".dml";
-                       programArgs = new String[] {"-stats", "20", "-lineage", 
"reuse_full", 
+                       programArgs = new String[] {"-stats", "20", "-lineage", 
"reuse_full",
                                "-nvargs", "in_X1=" + 
TestUtils.federatedAddress(port1, input("X1")),
                                "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
                                "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
@@ -127,6 +113,20 @@ public class LineageFedReuseAlg extends AutomatedTestBase {
                        long mmCount_reuse = 
Statistics.getCPHeavyHitterCount("ba+*");
                        long fed_mmCount_reuse = 
Statistics.getCPHeavyHitterCount("fed_ba+*");
 
+                       // Run with federated matrix and without reuse
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[] {"-stats", "20",
+                               "-nvargs", "in_X1=" + 
TestUtils.federatedAddress(port1, input("X1")),
+                               "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
+                               "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
+                               "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")), "rows=" + rows, "cols=" + (cols + 1),
+                               "in_Y=" + input("Y"), "cont=" + 
String.valueOf(contSplits).toUpperCase(), "out=" + expected("Z")};
+                       runTest(true, false, null, -1);
+                       long tsmmCount = 
Statistics.getCPHeavyHitterCount("tsmm");
+                       long fed_tsmmCount = 
Statistics.getCPHeavyHitterCount("fed_tsmm");
+                       long mmCount = Statistics.getCPHeavyHitterCount("ba+*");
+                       long fed_mmCount = 
Statistics.getCPHeavyHitterCount("fed_ba+*");
+
                        // compare results 
                        compareResults(1e-2);
                        // compare potentially reused instruction counts
diff --git 
a/src/test/scripts/functions/federated/multitenant/FederatedMultiTenantTest.dml 
b/src/test/scripts/functions/federated/multitenant/FederatedMultiTenantTest.dml
new file mode 100644
index 0000000..e0bef2e
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/multitenant/FederatedMultiTenantTest.dml
@@ -0,0 +1,63 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+               list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+} else {
+    X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), 
list($rows, $cols/2),
+               list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 
3*($cols/4)), list($rows, $cols)));
+}
+
+testnum = $testnum;
+
+if(testnum == 0) { # SUM
+  S = as.matrix(sum(X));
+}
+else if(testnum == 1) { # PARFOR_SUM
+  numiter = 5;
+  Z = matrix(0, rows=numiter, cols=1);
+  parfor( i in 1:numiter ) {
+    while(FALSE) { }
+    Y = X + i;
+    while(FALSE) { }
+    Z[i, 1] = sum(Y);
+  }
+  S = as.matrix(0);
+  for( i in 1:numiter ) {
+    while(FALSE) { }
+    S = S + Z[i, 1];
+  }
+}
+else if(testnum == 2) { # WSIGMOID
+  N = nrow(X);
+  M = ncol(X);
+
+  U = rand(rows=N, cols=15, seed=123);
+  V = rand(rows=M, cols=15, seed=456);
+
+  UV = U %*% t(V);
+  S = X * log(1 / (1 + exp(-UV)));
+}
+
+write(S, $out_S);

Reply via email to