This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 276d153  [SYSTEMDS-3097] Fix CSV metadata parsing in federated 
execution
276d153 is described below

commit 276d153b8ab18a10015c2230900b448e085f6610
Author: baunsgaard <[email protected]>
AuthorDate: Mon Aug 23 09:44:18 2021 +0200

    [SYSTEMDS-3097] Fix CSV metadata parsing in federated execution
    
    This commit fixes the metadata handling when parsing a federated csv
    file.
    The issues was that header was always set to true, when parsing CSV.
    The commit also contains both python and java tests to remove future
    errors.
    
    Closes #1370
---
 .../federated/FederatedWorkerHandler.java          |   5 +-
 .../org/apache/sysds/runtime/meta/MetaDataAll.java |  20 +-
 src/main/python/tests/federated/runFedTest.sh      |  11 +-
 .../test_federated_aggregations_noHeader.py        | 204 +++++++++++++++++++++
 .../org/apache/sysds/test/AutomatedTestBase.java   |  33 +++-
 src/test/java/org/apache/sysds/test/TestUtils.java |  26 +++
 .../functions/federated/io/FederatedReaderCSV.java | 115 ++++++++++++
 .../federated/io/FederatedReaderTest.java          |   4 +-
 8 files changed, 399 insertions(+), 19 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 7bcae7e..062cfa0 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -197,6 +197,7 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                String delim = null;
                FileSystem fs = null;
                MetaDataAll mtd;
+               
                try {
                        String mtdname = 
DataExpression.getMTDFileName(filename);
                        Path path = new Path(mtdname);
@@ -219,9 +220,7 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                        throw ex;
                }
                catch (Exception ex) {
-                       String msg = "Exception in reading metadata of: " + 
filename;
-                       log.error(msg, ex);
-                       throw new DMLRuntimeException(msg);
+                       throw new DMLRuntimeException(ex);
                }
                finally {
                        IOUtilFunctions.closeSilently(fs);
diff --git a/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java 
b/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
index ae6cc99..9132bd4 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
@@ -39,6 +39,7 @@ import org.apache.sysds.parser.Expression;
 import org.apache.sysds.parser.LanguageException;
 import org.apache.sysds.parser.ParseException;
 import org.apache.sysds.parser.StringIdentifier;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.privacy.PrivacyConstraint;
@@ -50,6 +51,7 @@ import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
 public class MetaDataAll extends DataIdentifier {
+       // private static final Log LOG = 
LogFactory.getLog(MetaDataAll.class.getName());
 
        private JSONObject _metaObj;
 
@@ -79,8 +81,8 @@ public class MetaDataAll extends DataIdentifier {
                try {
                        _metaObj = JSONHelper.parse(br);
                }
-               catch(IOException e) {
-                       e.printStackTrace();
+               catch(Exception e) {
+                       throw new DMLRuntimeException(e);
                }
                setPrivacy(PrivacyConstraint.PrivacyLevel.None);
                parseMetaDataParams();
@@ -174,7 +176,14 @@ public class MetaDataAll extends DataIdentifier {
                        case DataExpression.FINE_GRAINED_PRIVACY:  
setFineGrainedPrivacy(val.toString()); break;
                        case DataExpression.DELIM_DELIMITER: 
setDelim(val.toString()); break;
                        case DataExpression.SCHEMAPARAM: 
setSchema(val.toString()); break;
-                       case DataExpression.DELIM_HAS_HEADER_ROW: 
setHasHeader(true);
+                       case DataExpression.DELIM_HAS_HEADER_ROW:
+                               if(val instanceof Boolean){
+                                       boolean valB = (Boolean) val;
+                                       setHasHeader(valB);
+                                       break;
+                               }
+                               else
+                                       setHasHeader(false);
                        case DataExpression.DELIM_SPARSE: 
setSparseDelim((boolean) val);
                }
        }
@@ -402,4 +411,9 @@ public class MetaDataAll extends DataIdentifier {
                }
                return false;
        }
+
+       @Override
+       public String toString() {
+               return "MetaDataAll\n" + _metaObj + "\n" + super.toString();
+       }
 }
diff --git a/src/main/python/tests/federated/runFedTest.sh 
b/src/main/python/tests/federated/runFedTest.sh
index 0d6b4f4..b34ca99 100755
--- a/src/main/python/tests/federated/runFedTest.sh
+++ b/src/main/python/tests/federated/runFedTest.sh
@@ -30,8 +30,8 @@
 workerdir="tests/federated/worker/"
 outputdir="tests/federated/output/"
 tmpfiledir="tests/federated/tmp/"
-mkdir $workerdir
-mkdir $outputdir
+mkdir -p $workerdir
+mkdir -p $outputdir
 w1_Output="$workerdir/w1"
 w2_Output="$workerdir/w2"
 log="$outputdir/out.log"
@@ -55,13 +55,16 @@ echo -e "\nWorker 1:"
 cat $w1_Output
 echo -e "\nWorker 2:"
 cat $w2_Output
-rm -r $workerdir
 echo -e "\n------------\nTest output:\n------------"
 cat $log
 grepvals="$(tail -n 10 $log | grep OK)"
+echo -e "------------\n"
+
+# Cleanup
+rm -r $workerdir
 rm -r $outputdir
 rm -r $tmpfiledir
-echo -e "------------\n"
+
 if [[ $grepvals == *"OK"* ]]; then
        exit 0
 else
diff --git 
a/src/main/python/tests/federated/test_federated_aggregations_noHeader.py 
b/src/main/python/tests/federated/test_federated_aggregations_noHeader.py
new file mode 100644
index 0000000..25832b6
--- /dev/null
+++ b/src/main/python/tests/federated/test_federated_aggregations_noHeader.py
@@ -0,0 +1,204 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import io
+import json
+import os
+import shutil
+import sys
+import unittest
+
+import numpy as np
+from systemds.context import SystemDSContext
+
+os.environ['SYSDS_QUIET'] = "1"
+
+dim = 3
+
+m1 = np.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=np.int16)
+m2 = np.asarray([[2, 2, 2], [3, 3, 3], [4, 4, 4]], dtype=np.int16)
+
+tempdir = "./tests/federated/tmp/test_federated_aggregations_noHeader/"
+mtd = {"format": "csv", "header": False, "rows": dim,
+       "cols": dim, "data_type": "matrix", "value_type": "double"}
+
+# Create the testing directory if it does not exist.
+if not os.path.exists(tempdir):
+    os.makedirs(tempdir)
+
+# Save data files for the Federated workers.
+np.savetxt(tempdir + "m1.csv", m1, delimiter=",",fmt='%d')
+with io.open(tempdir + "m1.csv.mtd", "w", encoding="utf-8") as f:
+    f.write(json.dumps(mtd, ensure_ascii=False))
+
+np.savetxt(tempdir + "m2.csv", m2, delimiter=",",fmt='%d')
+with io.open(tempdir + "m2.csv.mtd", "w", encoding="utf-8") as f:
+    f.write(json.dumps(mtd, ensure_ascii=False))
+
+# Federated workers + file locations
+fed1 = "localhost:8001/" + tempdir + "m1.csv"
+fed2 = "localhost:8002/" + tempdir + "m2.csv"
+
+
+class TestFederatedAggFn(unittest.TestCase):
+
+    sds: SystemDSContext = None
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_equals(self):
+        f_m = (
+            self.sds.federated(
+                [fed1],
+                [([0, 0], [dim, dim])])
+            .compute()
+        )
+        self.assertTrue(np.allclose(f_m, m1))
+
+    def test_sum3(self):
+        #   [[m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]]
+        f_m_a = (
+            self.sds.federated(
+                [fed1, fed2],
+                [([0, 0], [dim, dim]), ([0, dim], [dim, dim * 2])])
+            .sum()
+            .compute()
+        )
+        m1_m2 = m1.sum() + m2.sum()
+        self.assertAlmostEqual(f_m_a, m1_m2)
+
+    def test_sum1(self):
+        f_m1 = (
+            self.sds.federated(
+                [fed1],
+                [([0, 0], [dim, dim])])
+            .sum()
+            .compute()
+        )
+        m1_r = m1.sum()
+        self.assertAlmostEqual(f_m1, m1_r)
+
+    def test_sum2(self):
+        f_m2 = (
+            self.sds.federated(
+                [fed2],
+                [([0, 0], [dim, dim])])
+            .sum()
+            .compute()
+        )
+        m2_r = m2.sum()
+        self.assertAlmostEqual(f_m2, m2_r)
+
+    def test_sum3(self):
+        #   [[m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]]
+        f_m1_m2 = (
+            self.sds.federated(
+                [fed1, fed2],
+                [([0, 0], [dim, dim]), ([0, dim], [dim, dim * 2])])
+            .sum()
+            .compute()
+        )
+
+        m1_m2 = np.concatenate((m1, m2), axis=1).sum()
+
+        self.assertAlmostEqual(f_m1_m2, m1_m2)
+
+    def test_sum4(self):
+        #   [[m1,m1,m1,m1,m1]
+        #    [m1,m1,m1,m1,m1]
+        #    [m1,m1,m1,m1,m1]
+        #    [m1,m1,m1,m1,m1]
+        #    [m1,m1,m1,m1,m1]
+        #    [m2,m2,m2,m2,m2]
+        #    [m2,m2,m2,m2,m2]
+        #    [m2,m2,m2,m2,m2]
+        #    [m2,m2,m2,m2,m2]
+        #    [m2,m2,m2,m2,m2]]
+        f_m1_m2 = (
+            self.sds.federated(
+                [fed1, fed2],
+                [([0, 0], [dim, dim]), ([dim, 0], [dim * 2, dim])])
+            .sum()
+            .compute()
+        )
+        m1_m2 = np.concatenate((m1, m2)).sum()
+        self.assertAlmostEqual(f_m1_m2, m1_m2)
+
+    # -----------------------------------
+    # The rest of the tests are
+    # Extended functionality not working Yet
+    # -----------------------------------
+
+    def test_sum5(self):
+        #   [[m1,m1,m1,m1,m1, 0, 0, 0, 0, 0]
+        #    [m1,m1,m1,m1,m1, 0, 0, 0, 0, 0]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [ 0, 0, 0, 0, 0,m2,m2,m2,m2,m2]
+        #    [ 0, 0, 0, 0, 0,m2,m2,m2,m2,m2]]
+        f_m_a = (
+            self.sds.federated(
+                [fed1, fed2],
+                [([0, 0], [dim, dim]), ([2, dim], [dim + 2, dim * 2])])
+            .sum()
+            .compute()
+        )
+        m1_m2 = m1.sum() + m2.sum()
+        self.assertAlmostEqual(f_m_a, m1_m2)
+
+    def test_sum8(self):
+        #   [[ 0, 0, 0, 0, 0, 0, 0, 0]
+        #    [ 0, 0, 0, 0, 0, 0, 0, 0]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]]
+        f_m_a = (
+            self.sds.federated(
+                [fed1],
+                [([2, 3], [dim + 2, dim + 3])])
+            .sum()
+            .compute()
+        )
+
+        m = m1.sum()
+
+        self.assertAlmostEqual(f_m_a, m)
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java 
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index a4da6e3..3d96cd9 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -69,6 +69,7 @@ import 
org.apache.sysds.runtime.controlprogram.federated.FederatedData;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FrameReader;
 import org.apache.sysds.runtime.io.FrameReaderFactory;
@@ -515,13 +516,7 @@ public abstract class AutomatedTestBase {
                String completePath = baseDirectory + INPUT_DIR + name + "/in";
                String completeRPath = baseDirectory + INPUT_DIR + name + 
".mtx";
 
-               try {
-                       cleanupExistingData(baseDirectory + INPUT_DIR + name, 
bIncludeR);
-               }
-               catch(IOException e) {
-                       e.printStackTrace();
-                       throw new RuntimeException(e);
-               }
+               cleanupDir(baseDirectory + INPUT_DIR + name, bIncludeR);
 
                TestUtils.writeTestMatrix(completePath, matrix);
                if(bIncludeR) {
@@ -535,6 +530,30 @@ public abstract class AutomatedTestBase {
                return matrix;
        }
 
+       protected void writeCSVMatrix(String name, double[][] matrix, boolean 
header, MatrixCharacteristics mc) {
+               try {
+                       final String completePath = baseDirectory + INPUT_DIR + 
name;
+                       final String completeMTDPath = baseDirectory + 
INPUT_DIR + name + ".mtd";
+                       cleanupDir(completePath, false);
+                       TestUtils.writeCSV(completePath, matrix, header);
+                       final FileFormatProperties ffp = header ? new 
FileFormatPropertiesCSV(true, ",", false, 0.0, "") : new 
FileFormatPropertiesCSV();
+                       HDFSTool.writeMetaDataFile(completeMTDPath, 
ValueType.FP64, mc, FileFormat.CSV, ffp);
+               }
+               catch(Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       protected void cleanupDir(String fullPath, boolean bIncludeR){
+               try {
+                       cleanupExistingData(fullPath, bIncludeR);
+               }
+               catch(IOException e) {
+                       e.printStackTrace();
+                       throw new RuntimeException(e);
+               }
+       }
+
        protected double[][] writeInputMatrixWithMTD(String name, MatrixBlock 
matrix, boolean bIncludeR) {
                double[][] data = DataConverter.convertToDoubleMatrix(matrix);
                return writeInputMatrixWithMTD(name, data, bIncludeR);
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java 
b/src/test/java/org/apache/sysds/test/TestUtils.java
index 9de6ae8..9282d93 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.io.FrameWriter;
 import org.apache.sysds.runtime.io.FrameWriterFactory;
@@ -2040,6 +2041,31 @@ public class TestUtils
                }
        }
 
+
+       protected static void writeCSV(String completePath, double[][] matrix, 
boolean header) throws IOException{
+               Path path = new Path(completePath);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
+               DataOutputStream out = fs.create(path, true);
+               try(BufferedWriter pw = new BufferedWriter(new 
OutputStreamWriter(out))) {
+
+                       if(header) {
+                               pw.append("d0");
+                               for(int i = 1; i < matrix[0].length; i++) {
+                                       pw.append(",d" + i);
+                               }
+                               pw.append("\n");
+                       }
+                       for(int j = 0; j < matrix.length; j++) {
+                               pw.append("" + matrix[j][0]);
+                               for(int i = 1; i < matrix[j].length; i++) {
+                                       pw.append("," + matrix[j][i]);
+                               }
+                               pw.append("\n");
+                       }
+               }
+       }
+
+
        /**
         * <p>
         * Writes a matrix to a file using the text format.
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderCSV.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderCSV.java
new file mode 100644
index 0000000..e8e1b31
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderCSV.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.functions.federated.io;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import 
org.apache.sysds.test.functions.federated.FederatedTestObjectConstructor;
+import org.junit.Assert;
+import org.junit.Test;
+
[email protected]
+public class FederatedReaderCSV extends AutomatedTestBase {
+
+    private static final Log LOG = 
LogFactory.getLog(FederatedReaderCSV.class.getName());
+    private final static String TEST_DIR = "functions/federated/ioR/";
+    private final static String TEST_NAME = "FederatedReaderTest";
+    private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedReaderCSV.class.getSimpleName() + "/";
+    private final static int blocksize = 1024;
+
+    private final static int dim = 3;
+    long[][] begins = new long[][] {new long[] {0, 0}};
+    long[][] ends = new long[][] {new long[] {dim, dim}};
+
+    @Override
+    public void setUp() {
+        TestUtils.clearAssertionInformation();
+        addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, 
TEST_NAME,new String[] {"X1"}));
+    }
+
+    @Test
+    public void testWithHeader() {
+        federatedRead(true);
+    }
+
+    @Test
+    public void testWithoutHeader() {
+        federatedRead(false);
+    }
+
+    public void federatedRead( boolean header) {
+        Types.ExecMode oldPlatform = setExecMode(ExecType.CP);
+        getAndLoadTestConfiguration(TEST_NAME);
+        setOutputBuffering(true);
+
+        
+        // empty script name because we don't execute any script, just start 
the worker
+        
+        fullDMLScriptName = "";
+        int port1 = getRandomAvailablePort();
+        Thread t1 = startLocalFedWorkerThread(port1);
+        String host = "localhost";
+        
+        try {
+            double[][] X1 = new double[][] {new double[] {1, 2, 3}, new 
double[] {4, 5, 6}, new double[] {7, 8, 9}};
+            MatrixCharacteristics mc = new MatrixCharacteristics(dim, dim, 
blocksize, dim * dim);
+            writeCSVMatrix("X1", X1, header, mc);
+
+            // Thread.sleep(10000);
+            MatrixObject fed = 
FederatedTestObjectConstructor.constructFederatedInput(dim, dim, blocksize, 
host, begins,
+                ends, new int[] {port1}, new String[] {input("X1")}, 
input("X.json"));
+            writeInputFederatedWithMTD("X.json", fed, null);
+
+            // Run reference dml script with normal matrix
+
+            fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + 
TEST_NAME + "1Reference.dml";
+            programArgs = new String[] {"-stats", "-args", input("X1")};
+
+            String refOut = runTest(null).toString();
+
+            LOG.debug(refOut);
+
+            // Run federated
+            fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + 
TEST_NAME + ".dml";
+            programArgs = new String[] {"-stats", "-args", input("X.json")};
+            String out = runTest(null).toString();
+
+            Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+            // Verify output
+            Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]), 
Double.parseDouble(out.split("\n")[0]),
+                0.00001);
+        }
+        catch(Exception e) {
+            e.printStackTrace();
+            Assert.assertTrue(false);
+        }
+        finally {
+            resetExecMode(oldPlatform);
+        }
+
+        TestUtils.shutdownThreads(t1);
+    }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
index 62cfd32..ff68c83 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
@@ -67,13 +67,11 @@ public class FederatedReaderTest extends AutomatedTestBase {
 
        @Test
        public void federatedSingleNodeReadOneWorker() {
-               LOG.debug("1Federated");
                federatedRead(Types.ExecMode.SINGLE_NODE, 1);
        }
 
        @Test
        public void federatedSingleNodeReadTwoWorker() {
-               LOG.debug("2Federated");
                federatedRead(Types.ExecMode.SINGLE_NODE, 2);
        }
 
@@ -124,6 +122,8 @@ public class FederatedReaderTest extends AutomatedTestBase {
 
                        String refOut = runTest(null).toString();
 
+                       LOG.debug(refOut);
+                       
                        // Run federated
                        fullDMLScriptName = SCRIPT_DIR + 
"functions/federated/io/" + TEST_NAME + ".dml";
                        programArgs = new String[] {"-stats", "-args", 
input("X.json")};

Reply via email to