This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new f33b516d10 [SYSTEMDS-3355] Federated monitoring backend worker 
communication
f33b516d10 is described below

commit f33b516d102115433ad101d0f76136cab92d01ae
Author: Mito <[email protected]>
AuthorDate: Sun May 15 18:41:51 2022 +0200

    [SYSTEMDS-3355] Federated monitoring backend worker communication
    
    Closes #1608.
---
 bin/systemds                                       |  38 ++++-
 src/main/java/org/apache/sysds/api/DMLOptions.java |  31 +++-
 src/main/java/org/apache/sysds/api/DMLScript.java  |   6 +
 .../federated/FederatedStatistics.java             |  12 +-
 .../monitoring/FederatedMonitoringServer.java      |  70 +++++---
 .../FederatedMonitoringServerHandler.java          | 180 +++++++++++----------
 .../controllers/CoordinatorController.java         |  46 +++---
 .../{BaseController.java => IController.java}      |  14 +-
 .../monitoring/controllers/WorkerController.java   |  81 ++++++++++
 .../monitoring/models/BaseEntityModel.java         |  78 +++++++++
 .../federated/monitoring/{ => models}/Request.java |  30 ++--
 .../monitoring/{ => models}/Response.java          |  38 ++---
 .../monitoring/repositories/DerbyRepository.java   | 171 ++++++++++++++++++++
 .../{Request.java => repositories/EntityEnum.java} |  26 +--
 .../IRepository.java}                              |  17 +-
 .../monitoring/services/WorkerService.java         |  89 ++++++++++
 .../org/apache/sysds/test/AutomatedTestBase.java   |  25 +++
 .../monitoring/FederatedMonitoringTestBase.java    | 101 ++++++++++++
 .../FederatedWorkerIntegrationCRUDTest.java        |  65 ++++++++
 .../monitoring/FederatedWorkerStatisticsTest.java  |  56 +++++++
 20 files changed, 952 insertions(+), 222 deletions(-)

diff --git a/bin/systemds b/bin/systemds
index 707f7e58ab..ffff838991 100755
--- a/bin/systemds
+++ b/bin/systemds
@@ -170,6 +170,10 @@ Worker Usage: $0 [-r] WORKER [SystemDS.jar] <portnumber> 
[arguments] [-help]
 
     port         : The port to open for the federated worker.
 
+Federated Monitoring Usage: $0 [-r] FEDMONITOR [SystemDS.jar] <portnumber> 
[arguments] [-help]
+
+    port         : The port to open for the federated monitoring tool.
+
 Set custom launch configuration by setting/editing SYSTEMDS_STANDALONE_OPTS
 and/or SYSTEMDS_DISTRIBUTED_OPTS.
 
@@ -256,6 +260,20 @@ elif echo "$1" | grep -q "WORKER"; then
     printUsage
   fi
   shift
+elif echo "$1" | grep -q "FEDMONITOR"; then
+  FEDMONITOR=1
+  shift
+  if echo "$1" | grep -q "jar"; then
+    SYSTEMDS_JAR_FILE=$1
+    shift
+  fi
+  PORT=$1
+  re='^[0-9]+$'
+  if ! [[ $PORT =~ $re ]] ; then
+    echo "error: Port is not a number"
+    printUsage
+  fi
+  shift
 else
   # handle optional '-f' before DML file (for consistency)
   if  echo "$1" | grep -q "\-f"; then
@@ -272,6 +290,9 @@ if [ -z "$WORKER" ] ; then
   WORKER=0
 fi
 
+if [ -z "$FEDMONITOR" ] ; then
+  FEDMONITOR=0
+fi
 
 # find me a SystemDS jar file to run
 if [ -z "$SYSTEMDS_JAR_FILE" ];then
@@ -409,7 +430,7 @@ print_out "#  HADOOP_HOME= $HADOOP_HOME"
 #build the command to run
 if [ $WORKER == 1 ]; then
   print_out "#"
-  print_out "#  starting Fedederated worker on port $PORT"
+  print_out "#  starting Federated worker on port $PORT"
   print_out 
"###############################################################################"
   CMD=" \
   java $SYSTEMDS_STANDALONE_OPTS \
@@ -422,6 +443,21 @@ if [ $WORKER == 1 ]; then
   print_out "Executing command: $CMD"
   print_out  ""
 
+if [ $FEDMONITORING == 1 ]; then
+  print_out "#"
+  print_out "#  starting Federated backend monitoring on port $PORT"
+  print_out 
"###############################################################################"
+  CMD=" \
+  java $SYSTEMDS_STANDALONE_OPTS \
+  -cp $CLASSPATH \
+  $LOG4JPROP \
+  org.apache.sysds.api.DMLScript \
+  -fedMonitor $PORT \
+  $CONFIG_FILE \
+  $*"
+  print_out "Executing command: $CMD"
+  print_out  ""
+
 elif [ $SYSDS_DISTRIBUTED == 0 ]; then
   print_out "#"
   print_out "#  Running script $SCRIPT_FILE locally with opts: $*"
diff --git a/src/main/java/org/apache/sysds/api/DMLOptions.java 
b/src/main/java/org/apache/sysds/api/DMLOptions.java
index f2c3f5664c..11bcea1604 100644
--- a/src/main/java/org/apache/sysds/api/DMLOptions.java
+++ b/src/main/java/org/apache/sysds/api/DMLOptions.java
@@ -73,7 +73,9 @@ public class DMLOptions {
        public boolean              lineage_debugger = false;         // 
whether enable lineage debugger
        public boolean              fedWorker     = false;
        public int                  fedWorkerPort = -1;
-       public int                  pythonPort    = -1; 
+       public boolean              fedMonitoring = false;
+       public int                  fedMonitoringPort = -1;
+       public int                  pythonPort    = -1;
        public boolean              checkPrivacy  = false;            // Check 
which privacy constraints are loaded and checked during federated execution 
        public boolean              federatedCompilation = false;     // 
Compile federated instructions based on input federation state and privacy 
constraints.
        public boolean              noFedRuntimeConversion = false;   // If 
activated, no runtime conversion of CP instructions to FED instructions will be 
performed.
@@ -95,6 +97,7 @@ public class DMLOptions {
                        ", statsCount=" + statsCount +
                        ", fedStats=" + fedStats +
                        ", fedStatsCount=" + fedStatsCount +
+                       ", fedMonitor=" + fedMonitoring +
                        ", memStats=" + memStats +
                        ", explainType=" + explainType +
                        ", execMode=" + execMode +
@@ -217,6 +220,7 @@ public class DMLOptions {
                                }
                        }
                }
+
                dmlOptions.memStats = line.hasOption("mem");
 
                dmlOptions.clean = line.hasOption("clean");
@@ -230,6 +234,11 @@ public class DMLOptions {
                        dmlOptions.fedWorkerPort = 
Integer.parseInt(line.getOptionValue("w"));
                }
 
+               if (line.hasOption("fedMonitor")) {
+                       dmlOptions.fedMonitoring= true;
+                       dmlOptions.fedMonitoringPort = 
Integer.parseInt(line.getOptionValue("fedMonitor"));
+               }
+
                if (line.hasOption("f")){
                        dmlOptions.filePath = line.getOptionValue("f");
                }
@@ -314,7 +323,8 @@ public class DMLOptions {
                Option configOpt = OptionBuilder.withArgName("filename")
                        .withDescription("uses a given configuration file (can 
be on local/hdfs/gpfs; default values in SystemDS-config.xml")
                        .hasArg().create("config");
-               Option cleanOpt = OptionBuilder.withDescription("cleans up all 
SystemDS working directories (FS, DFS); all other flags are ignored in this 
mode.")
+               Option cleanOpt = OptionBuilder
+                       .withDescription("cleans up all SystemDS working 
directories (FS, DFS); all other flags are ignored in this mode.")
                        .create("clean");
                Option statsOpt = OptionBuilder.withArgName("count")
                        .withDescription("monitors and reports summary 
execution statistics; heavy hitter <count> is 10 unless overridden; default 
off")
@@ -335,7 +345,8 @@ public class DMLOptions {
                        .hasOptionalArg().create("gpu");
                Option debugOpt = OptionBuilder.withDescription("runs in debug 
mode; default off")
                        .create("debug");
-               Option pythonOpt = OptionBuilder.withDescription("Python 
Context start with port argument for communication to python")
+               Option pythonOpt = OptionBuilder
+                       .withDescription("Python Context start with port 
argument for communication to python")
                        .isRequired().hasArg().create("python");
                Option fileOpt = OptionBuilder.withArgName("filename")
                        .withDescription("specifies dml/pydml file to execute; 
path can be local/hdfs/gpfs (prefixed with appropriate URI)")
@@ -343,12 +354,18 @@ public class DMLOptions {
                Option scriptOpt = OptionBuilder.withArgName("script_contents")
                        .withDescription("specified script string to execute 
directly")
                        .isRequired().hasArg().create("s");
-               Option helpOpt = OptionBuilder.withDescription("shows usage 
message")
+               Option helpOpt = OptionBuilder
+                       .withDescription("shows usage message")
                        .create("help");
-               Option lineageOpt = OptionBuilder.withDescription("computes 
lineage traces")
+               Option lineageOpt = OptionBuilder
+                       .withDescription("computes lineage traces")
                        .hasOptionalArgs().create("lineage");
-               Option fedOpt = OptionBuilder.withDescription("starts a 
federated worker with the given argument as the port.")
+               Option fedOpt = OptionBuilder
+                       .withDescription("starts a federated worker with the 
given argument as the port.")
                        .hasOptionalArg().create("w");
+               Option monitorOpt = OptionBuilder
+                       .withDescription("Starts a federated monitoring backend 
with the given argument as the port.")
+                       .hasOptionalArg().create("fedMonitor");
                Option checkPrivacy = OptionBuilder
                        .withDescription("Check which privacy constraints are 
loaded and checked during federated execution")
                        .create("checkPrivacy");
@@ -375,6 +392,7 @@ public class DMLOptions {
                options.addOption(debugOpt);
                options.addOption(lineageOpt);
                options.addOption(fedOpt);
+               options.addOption(monitorOpt);
                options.addOption(checkPrivacy);
                options.addOption(federatedCompilation);
                options.addOption(noFedRuntimeConversion);
@@ -387,6 +405,7 @@ public class DMLOptions {
                        .addOption(cleanOpt)
                        .addOption(helpOpt)
                        .addOption(fedOpt)
+                       .addOption(monitorOpt)
                        .addOption(pythonOpt);
                fileOrScriptOpt.setRequired(true);
                options.addOptionGroup(fileOrScriptOpt);
diff --git a/src/main/java/org/apache/sysds/api/DMLScript.java 
b/src/main/java/org/apache/sysds/api/DMLScript.java
index d74cf59bf7..9f6eb656e0 100644
--- a/src/main/java/org/apache/sysds/api/DMLScript.java
+++ b/src/main/java/org/apache/sysds/api/DMLScript.java
@@ -64,6 +64,7 @@ import 
org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedWorker;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.FederatedMonitoringServer;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool;
@@ -284,6 +285,11 @@ public class DMLScript
                                return true;
                        }
 
+                       if(dmlOptions.fedMonitoring) {
+                               new 
FederatedMonitoringServer(dmlOptions.fedMonitoringPort, dmlOptions.debug);
+                               return true;
+                       }
+
                        LineageCacheConfig.setConfig(LINEAGE_REUSE);
                        LineageCacheConfig.setCachePolicy(LINEAGE_POLICY);
                        LineageCacheConfig.setEstimator(LINEAGE_ESTIMATE);
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
index 58a9480266..d95b02afd2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
@@ -219,8 +219,12 @@ public class FederatedStatistics {
        }
 
        public static String displayStatistics(int numHeavyHitters) {
-               StringBuilder sb = new StringBuilder();
                FedStatsCollection fedStats = collectFedStats();
+               return displayStatistics(fedStats, numHeavyHitters);
+       }
+
+       public static String displayStatistics(FedStatsCollection fedStats, int 
numHeavyHitters) {
+               StringBuilder sb = new StringBuilder();
                sb.append("SystemDS Federated Statistics:\n");
                sb.append(displayCacheStats(fedStats.cacheStats));
                sb.append(String.format("Total JIT compile time:\t\t%.3f 
sec.\n", fedStats.jitCompileTime));
@@ -499,7 +503,7 @@ public class FederatedStatistics {
                return "";
        }
 
-       private static class FedStatsCollectFunction extends FederatedUDF {
+       public static class FedStatsCollectFunction extends FederatedUDF {
                private static final long serialVersionUID = 1L;
 
                public FedStatsCollectFunction() {
@@ -519,7 +523,7 @@ public class FederatedStatistics {
                }
        }
 
-       protected static class FedStatsCollection implements Serializable {
+       public static class FedStatsCollection implements Serializable {
                private static final long serialVersionUID = 1L;
 
                private void collectStats() {
@@ -531,7 +535,7 @@ public class FederatedStatistics {
                        heavyHitters = Statistics.getHeavyHittersHashMap();
                }
                
-               private void aggregate(FedStatsCollection that) {
+               public void aggregate(FedStatsCollection that) {
                        cacheStats.aggregate(that.cacheStats);
                        jitCompileTime += that.jitCompileTime;
                        gcStats.aggregate(that.gcStats);
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java
index 61bc6e5dc3..8976d65194 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java
@@ -28,37 +28,55 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.HttpServerCodec;
+import org.apache.log4j.Logger;
 
 public class FederatedMonitoringServer {
-    private final int _port;
+       protected static Logger log = 
Logger.getLogger(FederatedMonitoringServer.class);
+       private final int _port;
 
-    public FederatedMonitoringServer(int port) {
-        _port = (port == -1) ? 4201 : port;
-    }
+       private final boolean _debug;
 
-    public void run() throws Exception {
-        EventLoopGroup bossGroup = new NioEventLoopGroup();
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
+       public FederatedMonitoringServer(int port, boolean debug) {
+               _port = (port == -1) ? 4201 : port;
 
-        try {
-            ServerBootstrap server = new ServerBootstrap();
-            server.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new ChannelInitializer<>() {
-                        @Override
-                        protected void initChannel(Channel ch) {
-                            ChannelPipeline pipeline = ch.pipeline();
+               _debug = debug;
 
-                            pipeline.addLast(new HttpServerCodec());
-                            pipeline.addLast(new 
FederatedMonitoringServerHandler());
-                        }
-                    });
+               run();
+       }
 
-            ChannelFuture f = server.bind(_port).sync();
-            f.channel().closeFuture().sync();
-        } finally {
-            workerGroup.shutdownGracefully();
-            bossGroup.shutdownGracefully();
-        }
-    }
+       public void run() {
+               log.info("Setting up Federated Monitoring Backend on port " + 
_port);
+               EventLoopGroup bossGroup = new NioEventLoopGroup();
+               EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+               try {
+                       ServerBootstrap server = new ServerBootstrap();
+                       server.group(bossGroup, workerGroup)
+                               .channel(NioServerSocketChannel.class)
+                               .childHandler(new ChannelInitializer<>() {
+                                       @Override
+                                       protected void initChannel(Channel ch) {
+                                       ChannelPipeline pipeline = 
ch.pipeline();
+
+                                       pipeline.addLast(new HttpServerCodec());
+                                       pipeline.addLast(new 
FederatedMonitoringServerHandler());
+                                       }
+                               });
+
+                       log.info("Starting Federated Monitoring Backend server 
at port: " + _port);
+                       ChannelFuture f = server.bind(_port).sync();
+                       log.info("Started Federated Monitoring Backend at port: 
" + _port);
+                       f.channel().closeFuture().sync();
+               } catch(Exception e) {
+                       log.info("Federated Monitoring Backend Interrupted");
+                       if (_debug) {
+                               log.error(e.getMessage());
+                               e.printStackTrace();
+                       }
+               } finally{
+                       log.info("Federated Monitoring Backend Shutting down.");
+                       workerGroup.shutdownGracefully();
+                       bossGroup.shutdownGracefully();
+               }
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java
index ac392b5000..2e7006055b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java
@@ -27,8 +27,10 @@ import io.netty.handler.codec.http.HttpObject;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.util.CharsetUtil;
-import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.BaseController;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.IController;
 import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.CoordinatorController;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.WorkerController;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.Request;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -38,92 +40,92 @@ import java.util.regex.Pattern;
 
 public class FederatedMonitoringServerHandler extends 
SimpleChannelInboundHandler<HttpObject> {
 
-    private final Map<String, BaseController> _allControllers = new 
HashMap<>();
-    {
-        _allControllers.put("/coordinators", new CoordinatorController());
-    }
-
-    private final static ThreadLocal<Request> _currentRequest = new 
ThreadLocal<>();
-
-    @Override
-    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
-
-        if (msg instanceof LastHttpContent) {
-            final ByteBuf jsonBuf = ((LastHttpContent) msg).content();
-            final Request request = _currentRequest.get();
-            request.setBody(jsonBuf.toString(CharsetUtil.UTF_8));
-
-            _currentRequest.remove();
-
-            final FullHttpResponse response = processRequest(request);
-            ctx.write(response);
-
-        } else if (msg instanceof HttpRequest) {
-            final HttpRequest httpRequest = (HttpRequest) msg;
-            final Request request = new Request();
-            request.setContext(httpRequest);
-
-            _currentRequest.set(request);
-        }
-
-    }
-
-    @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) {
-        ctx.flush();
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        cause.printStackTrace();
-        ctx.close();
-    }
-
-    private FullHttpResponse processRequest(final Request request) {
-        try {
-            final BaseController controller = 
parseController(request.getContext().uri());
-            final String method = request.getContext().method().name();
-
-            switch (method) {
-                case "GET":
-                    final Long id = parseId(request.getContext().uri());
-
-                    if (id != null) {
-                        return controller.get(request, id);
-                    }
-
-                    return controller.getAll(request);
-                case "PUT":
-                    return controller.create(request);
-                case "POST":
-                    return controller.update(request, 
parseId(request.getContext().uri()));
-                case "DELETE":
-                    return controller.delete(request, 
parseId(request.getContext().uri()));
-                default:
-                    throw new IllegalArgumentException("Method is not 
supported!");
-            }
-        } catch (RuntimeException ex) {
-            ex.printStackTrace();
-            return null;
-        }
-    }
-
-    private BaseController parseController(final String currentPath) {
-        final Optional<String> controller = _allControllers.keySet().stream()
-                .filter(currentPath::startsWith)
-                .findFirst();
-
-        return controller.map(_allControllers::get).orElseThrow(() ->
-                new IllegalArgumentException("Such controller does not 
exist!"));
-    }
-
-    private Long parseId(final String uri) {
-        final Pattern pattern = Pattern.compile("^[/][a-z]+[/]");
-        final Matcher matcher = pattern.matcher(uri);
-
-        if (matcher.find()) {
-            return Long.valueOf(uri.substring(matcher.end()));
-        }
-        return null;
-    }
+       private final Map<String, IController> _allControllers = new 
HashMap<>();
+       {
+               _allControllers.put("/coordinators", new 
CoordinatorController());
+               _allControllers.put("/workers", new WorkerController());
+       }
+
+       private final static ThreadLocal<Request> _currentRequest = new 
ThreadLocal<>();
+
+       @Override
+       protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+
+               if (msg instanceof LastHttpContent) {
+                       ByteBuf jsonBuf = ((LastHttpContent) msg).content();
+                       Request request = _currentRequest.get();
+                       request.setBody(jsonBuf.toString(CharsetUtil.UTF_8));
+
+                       _currentRequest.remove();
+
+                       final FullHttpResponse response = 
processRequest(request);
+                       ctx.write(response);
+
+               } else if (msg instanceof HttpRequest) {
+                       HttpRequest httpRequest = (HttpRequest) msg;
+                       Request request = new Request();
+                       request.setContext(httpRequest);
+
+                       _currentRequest.set(request);
+               }
+
+       }
+
+       @Override
+       public void channelReadComplete(ChannelHandlerContext ctx) {
+               ctx.flush();
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
{
+               cause.printStackTrace();
+               ctx.close();
+       }
+
+       private FullHttpResponse processRequest(final Request request) {
+               try {
+                       final IController controller = 
parseController(request.getContext().uri());
+                       final String method = 
request.getContext().method().name();
+
+                       switch (method) {
+                               case "GET":
+                                       final Long id = 
parseId(request.getContext().uri());
+
+                                       if (id != null) {
+                                               return controller.get(request, 
id);
+                                       }
+
+                                       return controller.getAll(request);
+                               case "PUT":
+                               return controller.update(request, 
parseId(request.getContext().uri()));
+                               case "POST":
+                                       return controller.create(request);
+                               case "DELETE":
+                                       return controller.delete(request, 
parseId(request.getContext().uri()));
+                               default:
+                                       throw new 
IllegalArgumentException("Method is not supported!");
+                       }
+               } catch (RuntimeException ex) {
+                       throw ex;
+               }
+       }
+
+       private IController parseController(final String currentPath) {
+               final Optional<String> controller = 
_allControllers.keySet().stream()
+                               .filter(currentPath::startsWith)
+                               .findFirst();
+
+               return controller.map(_allControllers::get).orElseThrow(() ->
+                               new IllegalArgumentException("Such controller 
does not exist!"));
+       }
+
+       private Long parseId(final String uri) {
+               final Pattern pattern = Pattern.compile("^[/][a-z]+[/]");
+               final Matcher matcher = pattern.matcher(uri);
+
+               if (matcher.find()) {
+                       return Long.valueOf(uri.substring(matcher.end()));
+               }
+               return null;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java
index be807721b9..8c81ffd24d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java
@@ -20,32 +20,32 @@
 package 
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
 
 import io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
-import org.apache.sysds.runtime.controlprogram.federated.monitoring.Response;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.Request;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.Response;
 
-public class CoordinatorController implements BaseController {
-    @Override
-    public FullHttpResponse create(Request request) {
-        return null;
-    }
+public class CoordinatorController implements IController {
+       @Override
+       public FullHttpResponse create(Request request) {
+               return null;
+       }
 
-    @Override
-    public FullHttpResponse update(Request request, Long objectId) {
-        return null;
-    }
+       @Override
+       public FullHttpResponse update(Request request, Long objectId) {
+               return null;
+       }
 
-    @Override
-    public FullHttpResponse delete(Request request, Long objectId) {
-        return null;
-    }
+       @Override
+       public FullHttpResponse delete(Request request, Long objectId) {
+               return null;
+       }
 
-    @Override
-    public FullHttpResponse get(Request request, Long objectId) {
-        return Response.ok("Success");
-    }
+       @Override
+       public FullHttpResponse get(Request request, Long objectId) {
+               return Response.ok("Success");
+       }
 
-    @Override
-    public FullHttpResponse getAll(Request request) {
-        return Response.ok("Success");
-    }
+       @Override
+       public FullHttpResponse getAll(Request request) {
+               return Response.ok("Success");
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/IController.java
similarity index 73%
copy from 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
copy to 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/IController.java
index 34a415b6e3..6016748bc8 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/IController.java
@@ -20,17 +20,17 @@
 package 
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
 
 import io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.Request;
 
-public interface BaseController {
+public interface IController {
 
-    FullHttpResponse create(final Request request);
+       FullHttpResponse create(final Request request);
 
-    FullHttpResponse update(final Request request, final Long objectId);
+       FullHttpResponse update(final Request request, final Long objectId);
 
-    FullHttpResponse delete(final Request request, final Long objectId);
+       FullHttpResponse delete(final Request request, final Long objectId);
 
-    FullHttpResponse get(final Request request, final Long objectId);
+       FullHttpResponse get(final Request request, final Long objectId);
 
-    FullHttpResponse getAll(final Request request);
+       FullHttpResponse getAll(final Request request);
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/WorkerController.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/WorkerController.java
new file mode 100644
index 0000000000..bdc46304f6
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/WorkerController.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.Request;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.Response;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.BaseEntityModel;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.services.WorkerService;
+
+import java.io.IOException;
+
+public class WorkerController implements IController {
+
+       private final WorkerService _workerService = new WorkerService();
+
+       @Override
+       public FullHttpResponse create(Request request) {
+
+               ObjectMapper mapper = new ObjectMapper();
+
+               try {
+                       BaseEntityModel model = 
mapper.readValue(request.getBody(), BaseEntityModel.class);
+                       _workerService.create(model);
+                       return Response.ok("Success");
+               }
+               catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public FullHttpResponse update(Request request, Long objectId) {
+               return null;
+       }
+
+       @Override
+       public FullHttpResponse delete(Request request, Long objectId) {
+               return null;
+       }
+
+       @Override
+       public FullHttpResponse get(Request request, Long objectId) {
+               var result = _workerService.get(objectId);
+
+               if (result == null) {
+                       return Response.notFound("No such worker can be found");
+               }
+
+               return Response.ok(result.toString());
+       }
+
+       @Override
+       public FullHttpResponse getAll(Request request) {
+               var workers = _workerService.getAll();
+
+               if (workers.isEmpty()) {
+                       return Response.notFound("No workers can be found");
+               }
+
+               return Response.ok(workers.toString());
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/BaseEntityModel.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/BaseEntityModel.java
new file mode 100644
index 0000000000..d42e76556f
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/BaseEntityModel.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.models;
+
+public class BaseEntityModel {
+       private Long _id;
+       private String _name;
+       private String _address;
+
+       private String _data;
+
+       public BaseEntityModel() { }
+
+       public BaseEntityModel(final Long id, final String name, final String 
address) {
+               _id = id;
+               _name = name;
+               _address = address;
+       }
+
+       public Long getId() {
+               return _id;
+       }
+
+       public void setId(final Long id) {
+               _id = id;
+       }
+
+       public String getName() {
+               return _name;
+       }
+
+       public void setName(final String name) {
+               _name = name;
+       }
+
+       public String getAddress() {
+               return _address;
+       }
+
+       public void setAddress(final String address) {
+               _address = address;
+       }
+
+       public String getData() {
+               return _data;
+       }
+
+       public void setData(final String data) {
+               _data = data;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("{" +
+                       "\"id\": %d," +
+                       "\"name\": \"%s\"," +
+                       "\"address\": \"%s\"," +
+                       "\"data\": \"%s\"" +
+                       "}", _id, _name, _address, _data);
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/Request.java
similarity index 71%
copy from 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
copy to 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/Request.java
index b9d71fe428..21d6812644 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/Request.java
@@ -17,27 +17,27 @@
  * under the License.
  */
 
-package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.models;
 
 import io.netty.handler.codec.http.HttpRequest;
 
 public class Request {
-    private HttpRequest _context;
-    private String _body;
+       private HttpRequest _context;
+       private String _body;
 
-    public HttpRequest getContext() {
-        return _context;
-    }
+       public HttpRequest getContext() {
+               return _context;
+       }
 
-    public void setContext(final HttpRequest requestContext) {
-        this._context = requestContext;
-    }
+       public void setContext(final HttpRequest requestContext) {
+               this._context = requestContext;
+       }
 
-    public String getBody() {
-        return _body;
-    }
+       public String getBody() {
+               return _body;
+       }
 
-    public void setBody(final String content) {
-        this._body = content;
-    }
+       public void setBody(final String content) {
+               this._body = content;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/Response.java
similarity index 55%
rename from 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java
rename to 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/Response.java
index 7a3814835b..9693af6060 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/Response.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.models;
 
 import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -27,27 +27,27 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 
 public class Response {
-    public static FullHttpResponse ok(final String result) {
-        FullHttpResponse response = new DefaultFullHttpResponse(
-                HttpVersion.HTTP_1_1,
-                HttpResponseStatus.OK,
-                Unpooled.wrappedBuffer(result.getBytes()));
+       public static FullHttpResponse ok(final String result) {
+               FullHttpResponse response = new DefaultFullHttpResponse(
+                               HttpVersion.HTTP_1_1,
+                               HttpResponseStatus.OK,
+                               Unpooled.wrappedBuffer(result.getBytes()));
 
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
"application/json");
-        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, 
response.content().readableBytes());
+               response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
"application/json");
+               response.headers().set(HttpHeaderNames.CONTENT_LENGTH, 
response.content().readableBytes());
 
-        return response;
-    }
+               return response;
+       }
 
-    public static FullHttpResponse notFound(final String exception) {
-        FullHttpResponse response = new DefaultFullHttpResponse(
-                HttpVersion.HTTP_1_1,
-                HttpResponseStatus.NOT_FOUND,
-                Unpooled.wrappedBuffer(exception.getBytes()));
+       public static FullHttpResponse notFound(final String exception) {
+               FullHttpResponse response = new DefaultFullHttpResponse(
+                               HttpVersion.HTTP_1_1,
+                               HttpResponseStatus.NOT_FOUND,
+                               Unpooled.wrappedBuffer(exception.getBytes()));
 
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
-        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, 
response.content().readableBytes());
+               response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
"text/plain");
+               response.headers().set(HttpHeaderNames.CONTENT_LENGTH, 
response.content().readableBytes());
 
-        return response;
-    }
+               return response;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
new file mode 100644
index 0000000000..9e94a41d61
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories;
+
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.BaseEntityModel;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DerbyRepository implements IRepository {
+       private final static String DB_CONNECTION = "jdbc:derby:memory:derbyDB";
+       private final Connection _db;
+
+       private static final String WORKERS_TABLE_NAME= "workers";
+       private static final String ENTITY_NAME_COL = "name";
+       private static final String ENTITY_ADDR_COL = "address";
+
+       private static final String ENTITY_SCHEMA_CREATE_STMT = "CREATE TABLE 
%s " +
+                       "(id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY 
(START WITH 1, INCREMENT BY 1), " +
+                       "%s VARCHAR(60), " +
+                       "%s VARCHAR(120))";
+       private static final String ENTITY_INSERT_STMT = "INSERT INTO %s (%s, 
%s) VALUES (?, ?)";
+
+       private static final String GET_ENTITY_WITH_ID_STMT = "SELECT * FROM %s 
WHERE id = ?";
+       private static final String GET_ALL_ENTITIES_STMT = "SELECT * FROM %s";
+
+       public DerbyRepository() {
+               _db = createMonitoringDatabase();
+       }
+
+       private Connection createMonitoringDatabase() {
+               Connection db = null;
+               try {
+                       // Creates only if DB doesn't exist
+                       db = DriverManager.getConnection(DB_CONNECTION + 
";create=true");
+                       createMonitoringEntitiesInDB(db);
+
+                       return db;
+               }
+               catch (SQLException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private void createMonitoringEntitiesInDB(Connection db) {
+               try {
+                       var dbMetaData = db.getMetaData();
+                       var workersExist = dbMetaData.getTables(null, null, 
WORKERS_TABLE_NAME.toUpperCase(),null);
+
+                       // Check if table already exists and create if not
+                       if(!workersExist.next())
+                       {
+                               PreparedStatement st = db.prepareStatement(
+                                               
String.format(ENTITY_SCHEMA_CREATE_STMT, WORKERS_TABLE_NAME, ENTITY_NAME_COL, 
ENTITY_ADDR_COL));
+                               st.executeUpdate();
+
+                       }
+               }
+               catch (SQLException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       public void createEntity(EntityEnum type, BaseEntityModel model) {
+
+               try {
+                       PreparedStatement st = _db.prepareStatement(
+                                       String.format(ENTITY_INSERT_STMT, 
WORKERS_TABLE_NAME, ENTITY_NAME_COL, ENTITY_ADDR_COL));
+
+                       if (type == EntityEnum.COORDINATOR) {
+                               // Change statement
+                       }
+
+                       st.setString(1, model.getName());
+                       st.setString(2, model.getAddress());
+                       st.executeUpdate();
+
+               } catch (SQLException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       public BaseEntityModel getEntity(EntityEnum type, Long id) {
+               BaseEntityModel resultModel = null;
+
+               try {
+                       PreparedStatement st = _db.prepareStatement(
+                                       String.format(GET_ENTITY_WITH_ID_STMT, 
WORKERS_TABLE_NAME));
+
+                       if (type == EntityEnum.COORDINATOR) {
+                               // Change statement
+                       }
+
+                       st.setLong(1, id);
+                       var resultSet = st.executeQuery();
+
+                       if (resultSet.next()){
+                               resultModel = mapEntityToModel(resultSet);
+                       }
+               } catch (SQLException e) {
+                       throw new RuntimeException(e);
+               }
+
+               return resultModel;
+       }
+
+       public List<BaseEntityModel> getAllEntities(EntityEnum type) {
+               List<BaseEntityModel> resultModels = new ArrayList<>();
+
+               try {
+                       PreparedStatement st = _db.prepareStatement(
+                                       String.format(GET_ALL_ENTITIES_STMT, 
WORKERS_TABLE_NAME));
+
+                       if (type == EntityEnum.COORDINATOR) {
+                               // Change statement
+                       }
+
+                       var resultSet = st.executeQuery();
+
+                       while (resultSet.next()){
+                               resultModels.add(mapEntityToModel(resultSet));
+                       }
+               } catch (SQLException e) {
+                       throw new RuntimeException(e);
+               }
+
+               return resultModels;
+       }
+
+       private BaseEntityModel mapEntityToModel(ResultSet resultSet) throws 
SQLException {
+               BaseEntityModel tmpModel = new BaseEntityModel();
+
+               for (int column = 1; column <= 
resultSet.getMetaData().getColumnCount(); column++) {
+                       if (resultSet.getMetaData().getColumnType(column) == 
Types.INTEGER) {
+                               tmpModel.setId(resultSet.getLong(column));
+                       }
+
+                       if (resultSet.getMetaData().getColumnType(column) == 
Types.VARCHAR) {
+                               if 
(resultSet.getMetaData().getColumnName(column).equalsIgnoreCase(ENTITY_NAME_COL))
 {
+                                       
tmpModel.setName(resultSet.getString(column));
+                               } else if 
(resultSet.getMetaData().getColumnName(column).equalsIgnoreCase(ENTITY_ADDR_COL))
 {
+                                       
tmpModel.setAddress(resultSet.getString(column));
+                               }
+                       }
+               }
+               return tmpModel;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/EntityEnum.java
similarity index 65%
rename from 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
rename to 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/EntityEnum.java
index b9d71fe428..7384257bf3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/EntityEnum.java
@@ -17,27 +17,9 @@
  * under the License.
  */
 
-package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+package 
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories;
 
-import io.netty.handler.codec.http.HttpRequest;
-
-public class Request {
-    private HttpRequest _context;
-    private String _body;
-
-    public HttpRequest getContext() {
-        return _context;
-    }
-
-    public void setContext(final HttpRequest requestContext) {
-        this._context = requestContext;
-    }
-
-    public String getBody() {
-        return _body;
-    }
-
-    public void setBody(final String content) {
-        this._body = content;
-    }
+public enum EntityEnum {
+       WORKER,
+       COORDINATOR
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/IRepository.java
similarity index 68%
rename from 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
rename to 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/IRepository.java
index 34a415b6e3..d441693e59 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/IRepository.java
@@ -17,20 +17,17 @@
  * under the License.
  */
 
-package 
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
 
-import io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
+package 
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories;
 
-public interface BaseController {
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.BaseEntityModel;
 
-    FullHttpResponse create(final Request request);
+import java.util.List;
 
-    FullHttpResponse update(final Request request, final Long objectId);
+public interface IRepository {
+       void createEntity(EntityEnum type, BaseEntityModel model);
 
-    FullHttpResponse delete(final Request request, final Long objectId);
+       BaseEntityModel getEntity(EntityEnum type, Long id);
 
-    FullHttpResponse get(final Request request, final Long objectId);
-
-    FullHttpResponse getAll(final Request request);
+       List<BaseEntityModel> getAllEntities(EntityEnum type);
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
new file mode 100644
index 0000000000..98fdfed672
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.services;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.BaseEntityModel;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories.DerbyRepository;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories.EntityEnum;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories.IRepository;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.Future;
+
+public class WorkerService {
+       private static final IRepository _entityRepository = new 
DerbyRepository();
+
+       public void create(BaseEntityModel model) {
+               _entityRepository.createEntity(EntityEnum.WORKER, model);
+       }
+
+       public BaseEntityModel get(Long id) {
+               var model = _entityRepository.getEntity(EntityEnum.WORKER, id);
+
+               try {
+                       var statisticsResponse = 
getWorkerStatistics(model.getAddress()).get();
+
+                       if (statisticsResponse.isSuccessful()) {
+                               FederatedStatistics.FedStatsCollection 
aggFedStats = new FederatedStatistics.FedStatsCollection();
+
+                               Object[] tmp = statisticsResponse.getData();
+                               if(tmp[0] instanceof 
FederatedStatistics.FedStatsCollection)
+                                       
aggFedStats.aggregate((FederatedStatistics.FedStatsCollection)tmp[0]);
+
+                               var statsStr = 
FederatedStatistics.displayStatistics(aggFedStats, 5);
+                               model.setData(statsStr);
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               return model;
+       }
+
+       public List<BaseEntityModel> getAll() {
+               return _entityRepository.getAllEntities(EntityEnum.WORKER);
+       }
+
+       private Future<FederatedResponse> getWorkerStatistics(String address) {
+               Future<FederatedResponse> result = null;
+
+               String host = address.split(":")[0];
+               int port = Integer.parseInt(address.split(":")[1]);
+
+               InetSocketAddress isa = new InetSocketAddress(host, port);
+               FederatedRequest frUDF = new 
FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
+                               new 
FederatedStatistics.FedStatsCollectFunction());
+               try {
+                       result = FederatedData.executeFederatedOperation(isa, 
frUDF);
+               } catch(DMLRuntimeException dre) {
+                       // silently ignore this exception --> caused by offline 
federated workers
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               return result;
+       }
+}
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java 
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index d960dac327..6ebff8eacd 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -1602,6 +1602,31 @@ public abstract class AutomatedTestBase {
                return process;
        }
 
+       /**
+        * Start new JVM for a federated monitoring backend at the port.
+        *
+        * @param port Port to use for the JVM
+        * @return the process associated with the worker.
+        */
+       protected Process startLocalFedMonitoring(int port, String[] addArgs) {
+               Process process = null;
+               String separator = System.getProperty("file.separator");
+               String classpath = System.getProperty("java.class.path");
+               String path = System.getProperty("java.home") + separator + 
"bin" + separator + "java";
+               String[] args = ArrayUtils.addAll(new String[]{path, "-cp", 
classpath, DMLScript.class.getName(),
+                               "-fedMonitor", Integer.toString(port)}, 
addArgs);
+               ProcessBuilder processBuilder = new ProcessBuilder(args);
+
+               try {
+                       process = processBuilder.start();
+                       sleep(1000);
+               }
+               catch(IOException | InterruptedException e) {
+                       throw new RuntimeException(e);
+               }
+               return process;
+       }
+
        /**
         * Start a thread for a worker. This will share the same JVM, so all 
static variables will be shared.!
         * 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedMonitoringTestBase.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedMonitoringTestBase.java
new file mode 100644
index 0000000000..483e3eee9e
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedMonitoringTestBase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.monitoring;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.BaseEntityModel;
+import 
org.apache.sysds.test.functions.federated.multitenant.MultiTenantTestBase;
+import org.junit.After;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class FederatedMonitoringTestBase extends MultiTenantTestBase {
+       protected Process monitoringProcess;
+       private int monitoringPort;
+
+       private static final String WORKER_MAIN_PATH = "/workers";
+
+       @Override
+       public abstract void setUp();
+
+       // ensure that the processes are killed - even if the test throws an 
exception
+       @After
+       public void stopMonitoringProcesses() {
+               if (monitoringProcess != null) {
+                       monitoringProcess.destroyForcibly();
+               }
+       }
+
+       /**
+        * Start federated backend monitoring processes on available port
+        *
+        * @return
+        */
+       protected void startFedMonitoring(String[] addArgs) {
+               monitoringPort = getRandomAvailablePort();
+               monitoringProcess = startLocalFedMonitoring(monitoringPort, 
addArgs);
+       }
+
+       protected List<HttpResponse<?>> addWorkers(int numWorkers) {
+               String uriStr = String.format("http://localhost:%d%s";, 
monitoringPort, WORKER_MAIN_PATH);
+
+               List<HttpResponse<?>> responses = new ArrayList<>();
+               try {
+                       ObjectMapper objectMapper = new ObjectMapper();
+                       for (int i = 0; i < numWorkers; i++) {
+                               String requestBody = objectMapper
+                                       .writerWithDefaultPrettyPrinter()
+                                       .writeValueAsString(new 
BaseEntityModel((i + 1L), "Worker", "localhost"));
+                               var client = HttpClient.newHttpClient();
+                               var request = 
HttpRequest.newBuilder(URI.create(uriStr))
+                                       .header("accept", "application/json")
+                                       
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
+                                       .build();
+                               responses.add(client.send(request, 
HttpResponse.BodyHandlers.ofString()));
+                       }
+
+                       return responses;
+               }
+               catch (IOException | InterruptedException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       protected HttpResponse<?> getWorkers() {
+               String uriStr = String.format("http://localhost:%d%s";, 
monitoringPort, WORKER_MAIN_PATH);
+
+               try {
+                       var client = HttpClient.newHttpClient();
+                       var request = HttpRequest.newBuilder(URI.create(uriStr))
+                               .header("accept", "application/json")
+                               .GET().build();
+                       return client.send(request, 
HttpResponse.BodyHandlers.ofString());
+               }
+               catch (IOException | InterruptedException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerIntegrationCRUDTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerIntegrationCRUDTest.java
new file mode 100644
index 0000000000..d9fd9d5d8e
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerIntegrationCRUDTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.monitoring;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpStatus;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FederatedWorkerIntegrationCRUDTest extends 
FederatedMonitoringTestBase {
+       private final static String TEST_NAME = 
"FederatedWorkerIntegrationCRUDTest";
+
+       private final static String TEST_DIR = 
"functions/federated/monitoring/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FederatedWorkerIntegrationCRUDTest.class.getSimpleName() + "/";
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S"}));
+               startFedMonitoring(null);
+       }
+
+       @Test
+       public void testWorkerAddedForMonitoring() {
+               var addedWorkers = addWorkers(1);
+               var firstWorkerStatus = addedWorkers.get(0).statusCode();
+
+               Assert.assertEquals("Added worker status code", 
HttpStatus.SC_OK, firstWorkerStatus);
+       }
+
+       @Test
+       public void testCorrectAmountAddedWorkersForMonitoring() {
+               int numWorkers = 3;
+               var addedWorkers = addWorkers(numWorkers);
+
+               for (int i = 0; i < numWorkers; i++) {
+                       var workerStatus = addedWorkers.get(i).statusCode();
+                       Assert.assertEquals("Added worker status code", 
HttpStatus.SC_OK, workerStatus);
+               }
+
+               var getAllWorkersResponse = getWorkers();
+               var numReturnedWorkers = 
StringUtils.countMatches(getAllWorkersResponse.body().toString(), "id");
+
+               Assert.assertEquals("Amount of workers to get", numWorkers, 
numReturnedWorkers);
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerStatisticsTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerStatisticsTest.java
new file mode 100644
index 0000000000..dc8fca39b6
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerStatisticsTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.monitoring;
+
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.BaseEntityModel;
+import 
org.apache.sysds.runtime.controlprogram.federated.monitoring.services.WorkerService;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FederatedWorkerStatisticsTest extends FederatedMonitoringTestBase 
{
+       private final static String TEST_NAME = "FederatedWorkerStatisticsTest";
+
+       private final static String TEST_DIR = 
"functions/federated/monitoring/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FederatedWorkerStatisticsTest.class.getSimpleName() + "/";
+
+       private static int[] workerPorts;
+       private final WorkerService workerMonitoringService = new 
WorkerService();
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S"}));
+               workerPorts = startFedWorkers(3);
+       }
+
+       @Test
+       public void testWorkerStatisticsReturnedForMonitoring() {
+               workerMonitoringService.create(new BaseEntityModel(1L, 
"Worker", "localhost:" + workerPorts[0]));
+
+               var model = workerMonitoringService.get(1L);
+               var modelData = model.getData();
+
+               Assert.assertNotNull("Data field of model contains worker 
statistics", model.getData());
+               Assert.assertNotEquals("Data field of model contains worker 
statistics",0, modelData.length());
+               Assert.assertTrue("Data field of model contains worker 
statistics", modelData.contains("JVM"));
+       }
+}

Reply via email to