This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new dcdb21e  [SYSTEMDS-2705] Federated Writer
dcdb21e is described below

commit dcdb21eed45b3ad6655ebee475009b1888d81345
Author: baunsgaard <[email protected]>
AuthorDate: Mon Oct 26 14:13:46 2020 +0100

    [SYSTEMDS-2705] Federated Writer
    
    This commit add the ability to save a federated matrix, But only one that
    has UID 0. If the UID is 0 on the remote then we know that it is a matrix,
    that have just been read from a file. Therefore we can without modifying
    the remote workers save our json directly, and read normally this file.
    
    Example:
    
    X = federated(addresses=...,ranges=...)
    write(X, "X_fed.json", format="federated")
    
    After this you can read the federated matrix, and it will allocate as
    federated using the workers.
    
    read("X_fed.json")
---
 .github/workflows/functionsTests.yml               |  1 +
 .../org/apache/sysds/parser/DMLTranslator.java     |  4 +-
 .../controlprogram/caching/CacheableData.java      | 23 +++++---
 .../controlprogram/caching/MatrixObject.java       |  5 +-
 .../instructions/fed/FEDInstructionUtils.java      | 16 +++++-
 .../instructions/fed/InitFEDInstruction.java       |  6 +--
 .../sysds/runtime/io/ReaderWriterFederated.java    | 15 +++---
 .../federated/io/FederatedReaderTest.java          | 18 ++++---
 ...tedReaderTest.java => FederatedWriterTest.java} | 63 +++++++++++-----------
 .../federated/io/FederatedReaderTestCreate.dml     | 26 +++++++++
 .../functions/federated/io/FederatedReference.dml  | 28 ++++++++++
 11 files changed, 146 insertions(+), 59 deletions(-)

diff --git a/.github/workflows/functionsTests.yml 
b/.github/workflows/functionsTests.yml
index 8da90ee..ba231f0 100644
--- a/.github/workflows/functionsTests.yml
+++ b/.github/workflows/functionsTests.yml
@@ -58,6 +58,7 @@ jobs:
           dnn,
           federated.algorithms,
           federated.primitives,
+          federated.io,
           federated.transform,
           frame,
           indexing,
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index c2e0f95..ff41df6 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -1055,7 +1055,9 @@ public class DMLTranslator
                                                // write output in binary block 
format
                                                
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), 
ConfigurationManager.getBlocksize());
                                                break;
-                                               
+                                       case FEDERATED:
+                                               
ae.setOutputParams(ae.getDim1(), ae.getDim2(), -1, ae.getUpdateType(), -1);
+                                               break;
                                                default:
                                                        throw new 
LanguageException("Unrecognized file format: " + ae.getInputFormatType());
                                        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 6828c7a..b69596c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -19,6 +19,13 @@
 
 package org.apache.sysds.runtime.controlprogram.caching;
 
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,12 +44,14 @@ import 
org.apache.sysds.runtime.controlprogram.federated.FederationMap.FType;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.fed.InitFEDInstruction;
 import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
 import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
 import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
 import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.io.ReaderWriterFederated;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.meta.MetaData;
@@ -51,13 +60,6 @@ import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 import org.apache.sysds.utils.Statistics;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.SoftReference;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 /**
  * Each object of this class is a cache envelope for some large piece of data
@@ -333,6 +335,13 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         * @return true if federated else false
         */
        public boolean isFederated() {
+               if(_fedMapping == null && _metaData instanceof MetaDataFormat){
+                       MetaDataFormat mdf = (MetaDataFormat) _metaData;
+                       if(mdf.getFileFormat() == FileFormat.FEDERATED){
+                               InitFEDInstruction.federateMatrix(this, 
ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics()));
+                               return true;
+                       }
+               }
                return _fedMapping != null;
        }
        
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index da98df3..85d8a8f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -560,7 +560,10 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                
                MetaDataFormat iimd = (MetaDataFormat) _metaData;
 
-               if (_data != null)
+               if(this.isFederated() &&  FileFormat.safeValueOf(ofmt) == 
FileFormat.FEDERATED){
+                       ReaderWriterFederated.write(fname,this._fedMapping);
+               }
+               else if (_data != null)
                {
                        // Get the dimension information from the metadata 
stored within MatrixObject
                        DataCharacteristics mc = iimd.getDataCharacteristics();
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 0101954..8ec3432 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysds.runtime.instructions.fed;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -41,9 +43,19 @@ import 
org.apache.sysds.runtime.instructions.spark.MapmmSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.WriteSPInstruction;
 
 public class FEDInstructionUtils {
+       private static final Log LOG = 
LogFactory.getLog(FEDInstructionUtils.class.getName());
+
        // This is currently a rather simplistic to our solution of replacing 
instructions with their correct federated
        // counterpart, since we do not propagate the information that a matrix 
is federated, therefore we can not decide
        // to choose a federated instruction earlier.
+
+       /**
+        * Check and replace CP instructions with federated instructions if the 
instruction match criteria.
+        * 
+        * @param inst The instruction to analyse
+        * @param ec The Execution Context 
+        * @return The potentially modified instruction
+        */
        public static Instruction checkAndReplaceCP(Instruction inst, 
ExecutionContext ec) {
                FEDInstruction fedinst = null;
                if (inst instanceof AggregateBinaryCPInstruction) {
@@ -72,8 +84,10 @@ public class FEDInstructionUtils {
                        AggregateUnaryCPInstruction instruction = 
(AggregateUnaryCPInstruction) inst;
                        if( instruction.input1.isMatrix() && 
ec.containsVariable(instruction.input1) ) {
                                MatrixObject mo1 = 
ec.getMatrixObject(instruction.input1);
-                               if (mo1.isFederated() && 
instruction.getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT)
+                               if (mo1.isFederated() && 
instruction.getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT){
+                                       LOG.debug("Federated UnaryAggregate");
                                        fedinst = 
AggregateUnaryFEDInstruction.parseInstruction(inst.getInstructionString());
+                               }
                        }
                }
                else if (inst instanceof BinaryCPInstruction) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
index e42f192..c3789d6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
@@ -37,8 +37,8 @@ import org.apache.sysds.common.Types;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
-import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
@@ -147,7 +147,7 @@ public class InitFEDInstruction extends FEDInstruction {
                        }
                }
                if (type.equalsIgnoreCase(FED_MATRIX_IDENTIFIER)) {
-                       MatrixObject output = ec.getMatrixObject(_output);
+                       CacheableData<?> output = ec.getCacheableData(_output);
                        
output.getDataCharacteristics().setRows(usedDims[0]).setCols(usedDims[1]);
                        federateMatrix(output, feds);
                }
@@ -204,7 +204,7 @@ public class InitFEDInstruction extends FEDInstruction {
                }
        }
 
-       public static void federateMatrix(MatrixObject output, 
List<Pair<FederatedRange, FederatedData>> workers) {
+       public static void federateMatrix(CacheableData<?> output, 
List<Pair<FederatedRange, FederatedData>> workers) {
 
                Map<FederatedRange, FederatedData> fedMapping = new TreeMap<>();
                for (Pair<FederatedRange, FederatedData> t : workers) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
index 41ae5fd..96435b7 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
@@ -52,12 +52,16 @@ import org.apache.sysds.runtime.meta.DataCharacteristics;
 
 /**
  * This class serves as the reader for federated objects. To read the files a 
mdt file is required. The reader is
- * different from the other readers in the since that it does not return a 
MatrixBlock. but a Matrix Object wrapper,
+ * different from the other readers in the since that it does not return a 
MatrixBlock but a Matrix Object wrapper,
  * containing the federated Mapping.
  * 
+ * On the Matrix Object the function isFederated() will if called read in the 
federated locations and instantiate the
+ * map. The reading is done through this code.
+ * 
  * This means in practice that it circumvent the other reading code. See more 
in:
  * 
  * 
org.apache.sysds.runtime.controlprogram.caching.MatrixObject.readBlobFromHDFS()
+ * org.apache.sysds.runtime.controlprogram.caching.CacheableData.isFederated()
  * 
  */
 public class ReaderWriterFederated {
@@ -98,11 +102,6 @@ public class ReaderWriterFederated {
      * @param fedMap The federated map to save.
      */
     public static void write(String file, FederationMap fedMap) {
-        if(fedMap.getID() != 0) {
-            // TODO add writing to remote to allow this anyway.
-            throw new DMLRuntimeException(
-                "Invalid to save federated maps with ID's higher than 0, since 
they are modified.");
-        }
         LOG.debug("Writing federated map to " + file);
         try {
             JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
@@ -110,12 +109,12 @@ public class ReaderWriterFederated {
             FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
             DataOutputStream out = fs.create(path, true);
             ObjectMapper mapper = new ObjectMapper();
-            // FileOutputStream fileOutputStream = new 
FileOutputStream("post.json");
-            // String postJson = mapper.writeValueAsString(fedMap);
             FederatedDataAddress[] outObjects = 
parseMap(fedMap.getFedMapping());
             try(BufferedWriter pw = new BufferedWriter(new 
OutputStreamWriter(out))) {
                 mapper.writeValue(pw, outObjects);
             }
+
+            IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
         }
         catch(IOException e) {
             fail("Unable to write test federated matrix to (" + file + "): " + 
e.getMessage());
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
index 0f2b383..a8e4407 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
@@ -39,7 +39,7 @@ import org.junit.runners.Parameterized;
 public class FederatedReaderTest extends AutomatedTestBase {
 
     // private static final Log LOG = 
LogFactory.getLog(FederatedReaderTest.class.getName());
-    private final static String TEST_DIR = "functions/federated/io/";
+    private final static String TEST_DIR = "functions/federated/ioR/";
     private final static String TEST_NAME = "FederatedReaderTest";
     private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedReaderTest.class.getSimpleName() + "/";
     private final static int blocksize = 1024;
@@ -77,7 +77,6 @@ public class FederatedReaderTest extends AutomatedTestBase {
             DMLScript.USE_LOCAL_SPARK_CONFIG = true;
         }
         getAndLoadTestConfiguration(TEST_NAME);
-        String HOME = SCRIPT_DIR + TEST_DIR;
 
         // write input matrices
         int halfRows = rows / 2;
@@ -95,7 +94,7 @@ public class FederatedReaderTest extends AutomatedTestBase {
         Thread t1 = startLocalFedWorkerThread(port1);
         Thread t2 = startLocalFedWorkerThread(port2);
         String host = "localhost";
-        
+
         MatrixObject fed = 
FederatedTestObjectConstructor.constructFederatedInput(rows,
             cols,
             blocksize,
@@ -109,15 +108,20 @@ public class FederatedReaderTest extends 
AutomatedTestBase {
 
         try {
             // Run reference dml script with normal matrix
-            fullDMLScriptName = HOME + TEST_NAME + (rowPartitioned ? "Row" : 
"Col") + "Reference.dml";
+            fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + 
TEST_NAME + (rowPartitioned ? "Row" : "Col")
+                + "Reference.dml";
             programArgs = new String[] {"-args", input("X1"), input("X2")};
             String refOut = runTest(null).toString();
             // Run federated
-            fullDMLScriptName = HOME + TEST_NAME + ".dml";
-            programArgs = new String[] {"-args", input("X.json")};
+            fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + 
TEST_NAME + ".dml";
+            programArgs = new String[] {"-stats", "-args", input("X.json")};
             String out = runTest(null).toString();
+            // LOG.error(out);
+            Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
             // Verify output
-            Assert.assertEquals(refOut.split("\n")[0], out.split("\n")[0]);
+            Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]),
+                Double.parseDouble(out.split("\n")[0]),
+                0.00001);
         }
         catch(Exception e) {
             e.printStackTrace();
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
similarity index 71%
copy from 
src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
copy to 
src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
index 0f2b383..14dd466 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
@@ -23,12 +23,10 @@ import java.util.Collection;
 
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
-import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-import 
org.apache.sysds.test.functions.federated.FederatedTestObjectConstructor;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,13 +34,14 @@ import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
-public class FederatedReaderTest extends AutomatedTestBase {
+public class FederatedWriterTest extends AutomatedTestBase {
 
-    // private static final Log LOG = 
LogFactory.getLog(FederatedReaderTest.class.getName());
-    private final static String TEST_DIR = "functions/federated/io/";
-    private final static String TEST_NAME = "FederatedReaderTest";
-    private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedReaderTest.class.getSimpleName() + "/";
+    // private static final Log LOG = 
LogFactory.getLog(FederatedWriterTest.class.getName());
+    private final static String TEST_DIR = "functions/federated/";
+    private final static String TEST_NAME = "FederatedWriterTest";
+    private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedWriterTest.class.getSimpleName() + "/";
     private final static int blocksize = 1024;
+
     @Parameterized.Parameter()
     public int rows;
     @Parameterized.Parameter(1)
@@ -65,11 +64,11 @@ public class FederatedReaderTest extends AutomatedTestBase {
     }
 
     @Test
-    public void federatedSinglenodeRead() {
-        federatedRead(Types.ExecMode.SINGLE_NODE);
+    public void federatedSinglenodeWrite() {
+        federatedWrite(Types.ExecMode.SINGLE_NODE);
     }
 
-    public void federatedRead(Types.ExecMode execMode) {
+    public void federatedWrite(Types.ExecMode execMode) {
         boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
         Types.ExecMode platformOld = rtplatform;
         rtplatform = execMode;
@@ -77,12 +76,9 @@ public class FederatedReaderTest extends AutomatedTestBase {
             DMLScript.USE_LOCAL_SPARK_CONFIG = true;
         }
         getAndLoadTestConfiguration(TEST_NAME);
-        String HOME = SCRIPT_DIR + TEST_DIR;
 
         // write input matrices
         int halfRows = rows / 2;
-        long[][] begins = new long[][] {new long[] {0, 0}, new long[] 
{halfRows, 0}};
-        long[][] ends = new long[][] {new long[] {halfRows, cols}, new long[] 
{rows, cols}};
         // We have two matrices handled by a single federated worker
         double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
         double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
@@ -94,30 +90,35 @@ public class FederatedReaderTest extends AutomatedTestBase {
         int port2 = getRandomAvailablePort();
         Thread t1 = startLocalFedWorkerThread(port1);
         Thread t2 = startLocalFedWorkerThread(port2);
-        String host = "localhost";
-        
-        MatrixObject fed = 
FederatedTestObjectConstructor.constructFederatedInput(rows,
-            cols,
-            blocksize,
-            host,
-            begins,
-            ends,
-            new int[] {port1, port2},
-            new String[] {input("X1"), input("X2")},
-            input("X.json"));
-        writeInputFederatedWithMTD("X.json", fed, null);
 
         try {
+
+            // Run reader and write a federated json to enable the rest of the 
test
+            fullDMLScriptName = SCRIPT_DIR + 
"functions/federated/io/FederatedReaderTestCreate.dml";
+            programArgs = new String[] {"-stats", "-explain","-args", 
input("X1"), input("X2"), port1 + "", port2 + "", input("X.json")};
+            // String writer = runTest(null).toString();
+            runTest(null);
+
             // Run reference dml script with normal matrix
-            fullDMLScriptName = HOME + TEST_NAME + (rowPartitioned ? "Row" : 
"Col") + "Reference.dml";
-            programArgs = new String[] {"-args", input("X1"), input("X2")};
+            fullDMLScriptName = SCRIPT_DIR + 
"functions/federated/io/FederatedReaderTest.dml";
+            programArgs = new String[] {"-stats", "-args", input("X.json")};
+            String out = runTest(null).toString();
+
+            Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+
+            fullDMLScriptName = SCRIPT_DIR + 
"functions/federated/io/FederatedReference.dml";
+            // programArgs = new String[] {"-args", input("X1"), input("X2")};
+            programArgs = new String[] {"-stats", "100", "-nvargs",
+                "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+                "in_X2=" + TestUtils.federatedAddress(port2, input("X2")), 
"rows=" + rows, "cols=" + cols};
             String refOut = runTest(null).toString();
+
             // Run federated
-            fullDMLScriptName = HOME + TEST_NAME + ".dml";
-            programArgs = new String[] {"-args", input("X.json")};
-            String out = runTest(null).toString();
+
             // Verify output
-            Assert.assertEquals(refOut.split("\n")[0], out.split("\n")[0]);
+            Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]),
+                Double.parseDouble(out.split("\n")[0]),
+                0.00001);
         }
         catch(Exception e) {
             e.printStackTrace();
diff --git 
a/src/test/scripts/functions/federated/io/FederatedReaderTestCreate.dml 
b/src/test/scripts/functions/federated/io/FederatedReaderTestCreate.dml
new file mode 100644
index 0000000..d2e8a47
--- /dev/null
+++ b/src/test/scripts/functions/federated/io/FederatedReaderTestCreate.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X1 = read($1)
+X2 = read($2)
+X = federated(addresses=list("LocalHost:" +$3 + "/" +$1, "LocalHost:" +$4+ "/" 
+$2),
+    ranges=list(list(0, 0), list(nrow(X1), ncol(X1)), list(nrow(X1), 0), 
list(nrow(X1) + nrow(X2), ncol(X1))))
+write(X, $5, format="federated")
diff --git a/src/test/scripts/functions/federated/io/FederatedReference.dml 
b/src/test/scripts/functions/federated/io/FederatedReference.dml
new file mode 100644
index 0000000..e34db96
--- /dev/null
+++ b/src/test/scripts/functions/federated/io/FederatedReference.dml
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = federated(
+    addresses=list($in_X1, $in_X2), 
+    ranges=list(list(0, 0), list($rows / 2, $cols), 
+    list($rows / 2, 0), list($rows, $cols)))
+s = sum(A)
+
+print(s)
\ No newline at end of file

Reply via email to