This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new dcdb21e [SYSTEMDS-2705] Federated Writer
dcdb21e is described below
commit dcdb21eed45b3ad6655ebee475009b1888d81345
Author: baunsgaard <[email protected]>
AuthorDate: Mon Oct 26 14:13:46 2020 +0100
[SYSTEMDS-2705] Federated Writer
This commit add the ability to save a federated matrix, But only one that
has UID 0. If the UID is 0 on the remote then we know that it is a matrix,
that have just been read from a file. Therefore we can without modifying
the remote workers save our json directly, and read normally this file.
Example:
X = federated(addresses=...,ranges=...)
write(X, "X_fed.json", format="federated")
After this you can read the federated matrix, and it will allocate as
federated using the workers.
read("X_fed.json")
---
.github/workflows/functionsTests.yml | 1 +
.../org/apache/sysds/parser/DMLTranslator.java | 4 +-
.../controlprogram/caching/CacheableData.java | 23 +++++---
.../controlprogram/caching/MatrixObject.java | 5 +-
.../instructions/fed/FEDInstructionUtils.java | 16 +++++-
.../instructions/fed/InitFEDInstruction.java | 6 +--
.../sysds/runtime/io/ReaderWriterFederated.java | 15 +++---
.../federated/io/FederatedReaderTest.java | 18 ++++---
...tedReaderTest.java => FederatedWriterTest.java} | 63 +++++++++++-----------
.../federated/io/FederatedReaderTestCreate.dml | 26 +++++++++
.../functions/federated/io/FederatedReference.dml | 28 ++++++++++
11 files changed, 146 insertions(+), 59 deletions(-)
diff --git a/.github/workflows/functionsTests.yml
b/.github/workflows/functionsTests.yml
index 8da90ee..ba231f0 100644
--- a/.github/workflows/functionsTests.yml
+++ b/.github/workflows/functionsTests.yml
@@ -58,6 +58,7 @@ jobs:
dnn,
federated.algorithms,
federated.primitives,
+ federated.io,
federated.transform,
frame,
indexing,
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index c2e0f95..ff41df6 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -1055,7 +1055,9 @@ public class DMLTranslator
// write output in binary block
format
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(),
ConfigurationManager.getBlocksize());
break;
-
+ case FEDERATED:
+
ae.setOutputParams(ae.getDim1(), ae.getDim2(), -1, ae.getUpdateType(), -1);
+ break;
default:
throw new
LanguageException("Unrecognized file format: " + ae.getInputFormatType());
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 6828c7a..b69596c 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -19,6 +19,13 @@
package org.apache.sysds.runtime.controlprogram.caching;
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,12 +44,14 @@ import
org.apache.sysds.runtime.controlprogram.federated.FederationMap.FType;
import
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.fed.InitFEDInstruction;
import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.io.FileFormatProperties;
import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.io.ReaderWriterFederated;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaData;
@@ -51,13 +60,6 @@ import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.LocalFileUtils;
import org.apache.sysds.utils.Statistics;
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.SoftReference;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* Each object of this class is a cache envelope for some large piece of data
@@ -333,6 +335,13 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
* @return true if federated else false
*/
public boolean isFederated() {
+ if(_fedMapping == null && _metaData instanceof MetaDataFormat){
+ MetaDataFormat mdf = (MetaDataFormat) _metaData;
+ if(mdf.getFileFormat() == FileFormat.FEDERATED){
+ InitFEDInstruction.federateMatrix(this,
ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics()));
+ return true;
+ }
+ }
return _fedMapping != null;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index da98df3..85d8a8f 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -560,7 +560,10 @@ public class MatrixObject extends
CacheableData<MatrixBlock>
MetaDataFormat iimd = (MetaDataFormat) _metaData;
- if (_data != null)
+ if(this.isFederated() && FileFormat.safeValueOf(ofmt) ==
FileFormat.FEDERATED){
+ ReaderWriterFederated.write(fname,this._fedMapping);
+ }
+ else if (_data != null)
{
// Get the dimension information from the metadata
stored within MatrixObject
DataCharacteristics mc = iimd.getDataCharacteristics();
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 0101954..8ec3432 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -19,6 +19,8 @@
package org.apache.sysds.runtime.instructions.fed;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -41,9 +43,19 @@ import
org.apache.sysds.runtime.instructions.spark.MapmmSPInstruction;
import org.apache.sysds.runtime.instructions.spark.WriteSPInstruction;
public class FEDInstructionUtils {
+ private static final Log LOG =
LogFactory.getLog(FEDInstructionUtils.class.getName());
+
// This is currently a rather simplistic to our solution of replacing
instructions with their correct federated
// counterpart, since we do not propagate the information that a matrix
is federated, therefore we can not decide
// to choose a federated instruction earlier.
+
+ /**
+ * Check and replace CP instructions with federated instructions if the
instruction match criteria.
+ *
+ * @param inst The instruction to analyse
+ * @param ec The Execution Context
+ * @return The potentially modified instruction
+ */
public static Instruction checkAndReplaceCP(Instruction inst,
ExecutionContext ec) {
FEDInstruction fedinst = null;
if (inst instanceof AggregateBinaryCPInstruction) {
@@ -72,8 +84,10 @@ public class FEDInstructionUtils {
AggregateUnaryCPInstruction instruction =
(AggregateUnaryCPInstruction) inst;
if( instruction.input1.isMatrix() &&
ec.containsVariable(instruction.input1) ) {
MatrixObject mo1 =
ec.getMatrixObject(instruction.input1);
- if (mo1.isFederated() &&
instruction.getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT)
+ if (mo1.isFederated() &&
instruction.getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT){
+ LOG.debug("Federated UnaryAggregate");
fedinst =
AggregateUnaryFEDInstruction.parseInstruction(inst.getInstructionString());
+ }
}
}
else if (inst instanceof BinaryCPInstruction) {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
index e42f192..c3789d6 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
@@ -37,8 +37,8 @@ import org.apache.sysds.common.Types;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
-import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
@@ -147,7 +147,7 @@ public class InitFEDInstruction extends FEDInstruction {
}
}
if (type.equalsIgnoreCase(FED_MATRIX_IDENTIFIER)) {
- MatrixObject output = ec.getMatrixObject(_output);
+ CacheableData<?> output = ec.getCacheableData(_output);
output.getDataCharacteristics().setRows(usedDims[0]).setCols(usedDims[1]);
federateMatrix(output, feds);
}
@@ -204,7 +204,7 @@ public class InitFEDInstruction extends FEDInstruction {
}
}
- public static void federateMatrix(MatrixObject output,
List<Pair<FederatedRange, FederatedData>> workers) {
+ public static void federateMatrix(CacheableData<?> output,
List<Pair<FederatedRange, FederatedData>> workers) {
Map<FederatedRange, FederatedData> fedMapping = new TreeMap<>();
for (Pair<FederatedRange, FederatedData> t : workers) {
diff --git
a/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
index 41ae5fd..96435b7 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
@@ -52,12 +52,16 @@ import org.apache.sysds.runtime.meta.DataCharacteristics;
/**
* This class serves as the reader for federated objects. To read the files a
mdt file is required. The reader is
- * different from the other readers in the since that it does not return a
MatrixBlock. but a Matrix Object wrapper,
+ * different from the other readers in the since that it does not return a
MatrixBlock but a Matrix Object wrapper,
* containing the federated Mapping.
*
+ * On the Matrix Object the function isFederated() will if called read in the
federated locations and instantiate the
+ * map. The reading is done through this code.
+ *
* This means in practice that it circumvent the other reading code. See more
in:
*
*
org.apache.sysds.runtime.controlprogram.caching.MatrixObject.readBlobFromHDFS()
+ * org.apache.sysds.runtime.controlprogram.caching.CacheableData.isFederated()
*
*/
public class ReaderWriterFederated {
@@ -98,11 +102,6 @@ public class ReaderWriterFederated {
* @param fedMap The federated map to save.
*/
public static void write(String file, FederationMap fedMap) {
- if(fedMap.getID() != 0) {
- // TODO add writing to remote to allow this anyway.
- throw new DMLRuntimeException(
- "Invalid to save federated maps with ID's higher than 0, since
they are modified.");
- }
LOG.debug("Writing federated map to " + file);
try {
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
@@ -110,12 +109,12 @@ public class ReaderWriterFederated {
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
DataOutputStream out = fs.create(path, true);
ObjectMapper mapper = new ObjectMapper();
- // FileOutputStream fileOutputStream = new
FileOutputStream("post.json");
- // String postJson = mapper.writeValueAsString(fedMap);
FederatedDataAddress[] outObjects =
parseMap(fedMap.getFedMapping());
try(BufferedWriter pw = new BufferedWriter(new
OutputStreamWriter(out))) {
mapper.writeValue(pw, outObjects);
}
+
+ IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
}
catch(IOException e) {
fail("Unable to write test federated matrix to (" + file + "): " +
e.getMessage());
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
index 0f2b383..a8e4407 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
@@ -39,7 +39,7 @@ import org.junit.runners.Parameterized;
public class FederatedReaderTest extends AutomatedTestBase {
// private static final Log LOG =
LogFactory.getLog(FederatedReaderTest.class.getName());
- private final static String TEST_DIR = "functions/federated/io/";
+ private final static String TEST_DIR = "functions/federated/ioR/";
private final static String TEST_NAME = "FederatedReaderTest";
private final static String TEST_CLASS_DIR = TEST_DIR +
FederatedReaderTest.class.getSimpleName() + "/";
private final static int blocksize = 1024;
@@ -77,7 +77,6 @@ public class FederatedReaderTest extends AutomatedTestBase {
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
}
getAndLoadTestConfiguration(TEST_NAME);
- String HOME = SCRIPT_DIR + TEST_DIR;
// write input matrices
int halfRows = rows / 2;
@@ -95,7 +94,7 @@ public class FederatedReaderTest extends AutomatedTestBase {
Thread t1 = startLocalFedWorkerThread(port1);
Thread t2 = startLocalFedWorkerThread(port2);
String host = "localhost";
-
+
MatrixObject fed =
FederatedTestObjectConstructor.constructFederatedInput(rows,
cols,
blocksize,
@@ -109,15 +108,20 @@ public class FederatedReaderTest extends
AutomatedTestBase {
try {
// Run reference dml script with normal matrix
- fullDMLScriptName = HOME + TEST_NAME + (rowPartitioned ? "Row" :
"Col") + "Reference.dml";
+ fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" +
TEST_NAME + (rowPartitioned ? "Row" : "Col")
+ + "Reference.dml";
programArgs = new String[] {"-args", input("X1"), input("X2")};
String refOut = runTest(null).toString();
// Run federated
- fullDMLScriptName = HOME + TEST_NAME + ".dml";
- programArgs = new String[] {"-args", input("X.json")};
+ fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" +
TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "-args", input("X.json")};
String out = runTest(null).toString();
+ // LOG.error(out);
+ Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
// Verify output
- Assert.assertEquals(refOut.split("\n")[0], out.split("\n")[0]);
+ Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]),
+ Double.parseDouble(out.split("\n")[0]),
+ 0.00001);
}
catch(Exception e) {
e.printStackTrace();
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
similarity index 71%
copy from
src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
copy to
src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
index 0f2b383..14dd466 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedWriterTest.java
@@ -23,12 +23,10 @@ import java.util.Collection;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
-import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
-import
org.apache.sysds.test.functions.federated.FederatedTestObjectConstructor;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,13 +34,14 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
@net.jcip.annotations.NotThreadSafe
-public class FederatedReaderTest extends AutomatedTestBase {
+public class FederatedWriterTest extends AutomatedTestBase {
- // private static final Log LOG =
LogFactory.getLog(FederatedReaderTest.class.getName());
- private final static String TEST_DIR = "functions/federated/io/";
- private final static String TEST_NAME = "FederatedReaderTest";
- private final static String TEST_CLASS_DIR = TEST_DIR +
FederatedReaderTest.class.getSimpleName() + "/";
+ // private static final Log LOG =
LogFactory.getLog(FederatedWriterTest.class.getName());
+ private final static String TEST_DIR = "functions/federated/";
+ private final static String TEST_NAME = "FederatedWriterTest";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
FederatedWriterTest.class.getSimpleName() + "/";
private final static int blocksize = 1024;
+
@Parameterized.Parameter()
public int rows;
@Parameterized.Parameter(1)
@@ -65,11 +64,11 @@ public class FederatedReaderTest extends AutomatedTestBase {
}
@Test
- public void federatedSinglenodeRead() {
- federatedRead(Types.ExecMode.SINGLE_NODE);
+ public void federatedSinglenodeWrite() {
+ federatedWrite(Types.ExecMode.SINGLE_NODE);
}
- public void federatedRead(Types.ExecMode execMode) {
+ public void federatedWrite(Types.ExecMode execMode) {
boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
Types.ExecMode platformOld = rtplatform;
rtplatform = execMode;
@@ -77,12 +76,9 @@ public class FederatedReaderTest extends AutomatedTestBase {
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
}
getAndLoadTestConfiguration(TEST_NAME);
- String HOME = SCRIPT_DIR + TEST_DIR;
// write input matrices
int halfRows = rows / 2;
- long[][] begins = new long[][] {new long[] {0, 0}, new long[]
{halfRows, 0}};
- long[][] ends = new long[][] {new long[] {halfRows, cols}, new long[]
{rows, cols}};
// We have two matrices handled by a single federated worker
double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
@@ -94,30 +90,35 @@ public class FederatedReaderTest extends AutomatedTestBase {
int port2 = getRandomAvailablePort();
Thread t1 = startLocalFedWorkerThread(port1);
Thread t2 = startLocalFedWorkerThread(port2);
- String host = "localhost";
-
- MatrixObject fed =
FederatedTestObjectConstructor.constructFederatedInput(rows,
- cols,
- blocksize,
- host,
- begins,
- ends,
- new int[] {port1, port2},
- new String[] {input("X1"), input("X2")},
- input("X.json"));
- writeInputFederatedWithMTD("X.json", fed, null);
try {
+
+ // Run reader and write a federated json to enable the rest of the
test
+ fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/FederatedReaderTestCreate.dml";
+ programArgs = new String[] {"-stats", "-explain","-args",
input("X1"), input("X2"), port1 + "", port2 + "", input("X.json")};
+ // String writer = runTest(null).toString();
+ runTest(null);
+
// Run reference dml script with normal matrix
- fullDMLScriptName = HOME + TEST_NAME + (rowPartitioned ? "Row" :
"Col") + "Reference.dml";
- programArgs = new String[] {"-args", input("X1"), input("X2")};
+ fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/FederatedReaderTest.dml";
+ programArgs = new String[] {"-stats", "-args", input("X.json")};
+ String out = runTest(null).toString();
+
+ Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+
+ fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/FederatedReference.dml";
+ // programArgs = new String[] {"-args", input("X1"), input("X2")};
+ programArgs = new String[] {"-stats", "100", "-nvargs",
+ "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+ "in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
"rows=" + rows, "cols=" + cols};
String refOut = runTest(null).toString();
+
// Run federated
- fullDMLScriptName = HOME + TEST_NAME + ".dml";
- programArgs = new String[] {"-args", input("X.json")};
- String out = runTest(null).toString();
+
// Verify output
- Assert.assertEquals(refOut.split("\n")[0], out.split("\n")[0]);
+ Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]),
+ Double.parseDouble(out.split("\n")[0]),
+ 0.00001);
}
catch(Exception e) {
e.printStackTrace();
diff --git
a/src/test/scripts/functions/federated/io/FederatedReaderTestCreate.dml
b/src/test/scripts/functions/federated/io/FederatedReaderTestCreate.dml
new file mode 100644
index 0000000..d2e8a47
--- /dev/null
+++ b/src/test/scripts/functions/federated/io/FederatedReaderTestCreate.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X1 = read($1)
+X2 = read($2)
+X = federated(addresses=list("LocalHost:" +$3 + "/" +$1, "LocalHost:" +$4+ "/"
+$2),
+ ranges=list(list(0, 0), list(nrow(X1), ncol(X1)), list(nrow(X1), 0),
list(nrow(X1) + nrow(X2), ncol(X1))))
+write(X, $5, format="federated")
diff --git a/src/test/scripts/functions/federated/io/FederatedReference.dml
b/src/test/scripts/functions/federated/io/FederatedReference.dml
new file mode 100644
index 0000000..e34db96
--- /dev/null
+++ b/src/test/scripts/functions/federated/io/FederatedReference.dml
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = federated(
+ addresses=list($in_X1, $in_X2),
+ ranges=list(list(0, 0), list($rows / 2, $cols),
+ list($rows / 2, 0), list($rows, $cols)))
+s = sum(A)
+
+print(s)
\ No newline at end of file