This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit dffd69b40d12377a1d1024af4f425e20facadc0b
Author: baunsgaard <[email protected]>
AuthorDate: Thu Nov 4 10:27:58 2021 +0100

    [SYSTEMDS-3198] Python Federated Matrix Multiplication Tests
---
 .../tests/federated/test_federated_matrix_mult.py  | 298 +++++++++++++++++++++
 .../python/tests/federated/test_federated_read.py  | 103 +++++++
 2 files changed, 401 insertions(+)

diff --git a/src/main/python/tests/federated/test_federated_matrix_mult.py 
b/src/main/python/tests/federated/test_federated_matrix_mult.py
new file mode 100644
index 0000000..6551e11
--- /dev/null
+++ b/src/main/python/tests/federated/test_federated_matrix_mult.py
@@ -0,0 +1,298 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import io
+import json
+import os
+import time
+import unittest
+
+import numpy as np
+from systemds.context import SystemDSContext
+
+os.environ['SYSDS_QUIET'] = "1"
+
+dim = 3
+
+m = np.reshape(np.arange(1, dim * dim + 1, 1), (dim, dim))
+m_c2 = np.column_stack((m, m))
+m_c3 = np.column_stack((m, m_c2))
+m_r2 = np.row_stack((m, m))
+m_r3 = np.row_stack((m, m_r2))
+
+tempdir = "./tests/federated/tmp/test_federated_matrixmult/"
+mtd = {"format": "csv", "header": False, "rows": dim,
+       "cols": dim, "data_type": "matrix", "value_type": "double"}
+
+# Create the testing directory if it does not exist.
+if not os.path.exists(tempdir):
+    os.makedirs(tempdir)
+
+# Save data files for the Federated workers.
+np.savetxt(tempdir + "m.csv", m, delimiter=",")
+with io.open(tempdir + "m.csv.mtd", "w", encoding="utf-8") as f:
+    f.write(json.dumps(mtd, ensure_ascii=False))
+
+# Federated workers + file locations
+fed1 = "localhost:8001/" + tempdir + "m.csv"
+fed2 = "localhost:8002/" + tempdir + "m.csv"
+fed3 = "localhost:8003/" + tempdir + "m.csv"
+
+fed1_file = tempdir+"m1.fed"
+fed_c2_file = tempdir+"m_c2.fed"
+fed_c3_file = tempdir+"m_c3.fed"
+fed_r2_file = tempdir+"m_r2.fed"
+fed_r3_file = tempdir+"m_r3.fed"
+
+
+class TestFederatedAggFn(unittest.TestCase):
+
+    sds: SystemDSContext = None
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+        cls.sds.federated([fed1], [([0, 0], [dim, dim])]
+                          ).write(fed1_file, format="federated").compute()
+        cls.sds.federated([fed1, fed2], [
+            ([0, 0], [dim, dim]),
+            ([0, dim], [dim, dim*2])]).write(fed_c2_file, 
format="federated").compute()
+        cls.sds.federated([fed1, fed2, fed3],  [
+            ([0, 0], [dim, dim]),
+            ([0, dim], [dim, dim*2]),
+            ([0, dim*2], [dim, dim*3])]).write(fed_c3_file, 
format="federated").compute()
+        cls.sds.federated([fed1, fed2], [
+            ([0, 0], [dim, dim]),
+            ([dim, 0], [dim*2, dim])]).write(fed_r2_file, 
format="federated").compute()
+        cls.sds.federated([fed1, fed2, fed3],  [
+            ([0, 0], [dim, dim]),
+            ([dim, 0], [dim*2, dim]),
+            ([dim*2, 0], [dim*3, dim])]).write(fed_r3_file, 
format="federated").compute()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    #####################
+    # Single site tests #
+    #####################
+
+    def test_single_fed_site_same_matrix(self):
+        f_m = self.sds.read(fed1_file)
+        self.exec_test(m, m, f_m, f_m)
+
+    def test_single_fed_left_same_size(self):
+        f_m = self.sds.read(fed1_file)
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m, m, m_s, f_m)
+
+    def test_single_fed_left_plus_one_row(self):
+        f_m = self.sds.read(fed1_file)
+        m_row_plus1 = np.reshape(
+            np.arange(1, dim*(dim+1) + 1, 1), (dim+1, dim))
+        m_s = self.sds.from_numpy(m_row_plus1)
+        self.exec_test(m_row_plus1, m, m_s, f_m)
+
+    def test_single_fed_left_minus_one_row(self):
+        f_m = self.sds.read(fed1_file)
+        m_row_minus1 = np.reshape(
+            np.arange(1, dim*(dim-1) + 1, 1), (dim-1, dim))
+        m_s = self.sds.from_numpy(m_row_minus1)
+        self.exec_test(m_row_minus1, m, m_s, f_m)
+
+    def test_single_fed_left_vector_row(self):
+        f_m = self.sds.read(fed1_file)
+        v_row = np.arange(1, dim + 1, 1)
+        v_s = self.sds.from_numpy(v_row).t()
+        self.exec_test(v_row, m, v_s, f_m)
+
+    def test_single_fed_right_same_size(self):
+        f_m = self.sds.read(fed1_file)
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m, m, f_m, m_s)
+
+    def test_single_fed_right_plus_one_row(self):
+        f_m = self.sds.read(fed1_file)
+        m_col_plus1 = np.reshape(
+            np.arange(1, dim*(dim+1) + 1, 1), (dim, dim+1))
+        m_s = self.sds.from_numpy(m_col_plus1)
+        self.exec_test(m, m_col_plus1, f_m, m_s)
+
+    def test_single_fed_right_minus_one_row(self):
+        f_m = self.sds.read(fed1_file)
+        m_col_minus1 = np.reshape(
+            np.arange(1, dim*(dim-1) + 1, 1), (dim, dim-1))
+        m_s = self.sds.from_numpy(m_col_minus1)
+        self.exec_test(m, m_col_minus1, f_m, m_s)
+
+    def test_single_fed_right_vector(self):
+        f_m = self.sds.read(fed1_file)
+        v_col = np.reshape(np.arange(1, dim + 1, 1), (1, dim))
+        v_col_sds = self.sds.from_numpy(v_col).t()
+        self.exec_test(m, np.transpose(v_col), f_m, v_col_sds)
+
+    ##################################
+    # start two federated site tests #
+    ##################################
+
+    def test_two_fed_standard(self):
+        f_m2 = self.sds.read(fed_c2_file)
+        m = np.reshape(np.arange(1, dim*(dim + dim) + 1, 1), (dim*2, dim))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m, m_c2, m_s, f_m2)
+
+    def test_two_fed_left_minus_one_row(self):
+        f_m2 = self.sds.read(fed_c2_file)
+        m = np.reshape(np.arange(1, dim*(dim + dim-1)+1, 1), (dim*2 - 1, dim))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m, m_c2, m_s, f_m2)
+
+    def test_two_fed_left_plus_one_row(self):
+        f_m2 = self.sds.read(fed_c2_file)
+        m = np.reshape(np.arange(1, dim*(dim + dim+1)+1, 1), (dim*2 + 1, dim))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m, m_c2, m_s, f_m2)
+
+    def test_two_fed_left_vector_row(self):
+        f_m2 = self.sds.read(fed_c2_file)
+        m = np.arange(1, dim+1, 1)
+        m_s = self.sds.from_numpy(m).t()
+        self.exec_test(m, m_c2, m_s, f_m2)
+
+    def test_two_fed_right_standard(self):
+        f_m2 = self.sds.read(fed_c2_file)
+        m_s = self.sds.from_numpy(m_r2)
+        self.exec_test(m_c2, m_r2, f_m2, m_s)
+
+    def test_two_fed_right_col_minus_1(self):
+        f_m2 = self.sds.read(fed_c2_file)
+        m = np.reshape(np.arange(1, (dim-1)*(dim + dim)+1, 1),
+                       (dim * 2, dim-1))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m_c2, m, f_m2, m_s)
+
+    def test_two_fed_right_col_plus_1(self):
+        f_m2 = self.sds.read(fed_c2_file)
+        m = np.reshape(np.arange(1, (dim+1)*(dim + dim)+1, 1),
+                       (dim * 2, dim+1))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m_c2, m, f_m2, m_s)
+
+    def test_two_fed_right_vector(self):
+        f_m2 = self.sds.read(fed_c2_file)
+        m = np.reshape(np.arange(1, (dim + dim)+1, 1), (dim * 2, 1))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m_c2, m, f_m2, m_s)
+
+    ####################################
+    # Start three federated site tests #
+    ####################################
+
+    def test_three_fed_standard(self):
+        f_m3 = self.sds.read(fed_c3_file)
+        m = np.reshape(np.arange(1, dim*(dim * 3) + 1, 1), (dim*3, dim))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m, m_c3, m_s, f_m3)
+
+    def test_three_fed_left_minus_one_row(self):
+        f_m3 = self.sds.read(fed_c3_file)
+        m = np.reshape(np.arange(1, dim*(dim * 3-1)+1, 1), (dim*3 - 1, dim))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m, m_c3, m_s, f_m3)
+
+    def test_three_fed_left_plus_one_row(self):
+        f_m3 = self.sds.read(fed_c3_file)
+        m = np.reshape(np.arange(1, dim*(dim *3+1)+1, 1), (dim*3 + 1, dim))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m, m_c3, m_s, f_m3)
+
+    def test_three_fed_left_vector_row(self):
+        f_m3 = self.sds.read(fed_c3_file)
+        m = np.arange(1, dim+1, 1)
+        m_s = self.sds.from_numpy(m).t()
+        self.exec_test(m, m_c3, m_s, f_m3)
+
+    def test_three_fed_right_standard(self):
+        f_m3 = self.sds.read(fed_c3_file)
+        m_s = self.sds.from_numpy(m_r3)
+        self.exec_test(m_c3, m_r3, f_m3, m_s)
+
+    def test_three_fed_right_col_minus_1(self):
+        f_m3 = self.sds.read(fed_c3_file)
+        m = np.reshape(np.arange(1, (dim-1)*(dim*3)+1, 1), (dim * 3, dim-1))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m_c3, m, f_m3, m_s)
+
+    def test_three_fed_right_col_plus_1(self):
+        f_m3 = self.sds.read(fed_c3_file)
+        m = np.reshape(np.arange(1, (dim+1)*(dim *3)+1, 1), (dim * 3, dim+1))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m_c3, m, f_m3, m_s)
+
+    def test_three_fed_right_vector(self):
+        f_m3 = self.sds.read(fed_c3_file)
+        m = np.reshape(np.arange(1, (dim *3)+1, 1), (dim * 3, 1))
+        m_s = self.sds.from_numpy(m)
+        self.exec_test(m_c3, m, f_m3, m_s)
+
+    ###################
+    # row bind matrix #
+    ###################
+
+    def test_federated_row2_binded(self):
+        fed = self.sds.read(fed_r2_file)
+        s_m = self.sds.from_numpy(m_c2)
+        self.exec_test(m_c2, m_r2, s_m, fed)
+
+    def test_federated_row3_binded(self):
+        fed = self.sds.read(fed_r3_file)
+        s_m = self.sds.from_numpy(m_c3)
+        self.exec_test(m_c3, m_r3, s_m, fed)
+
+
+
+
+    def test_previously_failing(self):
+        # local matrix to multiply with
+        loc = np.array([
+            [1, 2, 3, 4, 5, 6, 7, 8, 9],
+            [1, 2, 3, 4, 5, 6, 7, 8, 9],
+            [1, 2, 3, 4, 5, 6, 7, 8, 9]])
+        # Multiply local and federated
+        ret_loc = loc @ m_r3
+
+        for i in range(1, 100):
+            loc_systemds = self.sds.from_numpy(loc)
+            fed = self.sds.read(fed_r3_file)
+            ret_fed = (loc_systemds @ fed).compute()
+            if not np.allclose(ret_fed, ret_loc):
+                self.fail(
+                    "not equal outputs of federated matrix multiplications")
+
+    def exec_test(self, left, right, f_left, f_right):
+        fed = f_left @ f_right
+        loc = left @ right
+        fed_res = fed.compute()
+        self.assertTrue(np.allclose(fed_res, loc))
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/federated/test_federated_read.py 
b/src/main/python/tests/federated/test_federated_read.py
new file mode 100644
index 0000000..6a3c28c
--- /dev/null
+++ b/src/main/python/tests/federated/test_federated_read.py
@@ -0,0 +1,103 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import io
+import json
+import os
+import shutil
+import sys
+import unittest
+
+import numpy as np
+from systemds.context import SystemDSContext
+
+os.environ['SYSDS_QUIET'] = "1"
+
+dim = 3
+
+m = np.reshape(np.arange(1, dim * dim + 1, 1), (dim, dim))
+
+tempdir = "./tests/federated/tmp/test_federated_matrixmult/"
+mtd = {"format": "csv", "header": False, "rows": dim,
+       "cols": dim, "data_type": "matrix", "value_type": "double"}
+
+# Create the testing directory if it does not exist.
+if not os.path.exists(tempdir):
+    os.makedirs(tempdir)
+
+# Save data files for the Federated workers.
+np.savetxt(tempdir + "m.csv", m, delimiter=",")
+with io.open(tempdir + "m.csv.mtd", "w", encoding="utf-8") as f:
+    f.write(json.dumps(mtd, ensure_ascii=False))
+
+# Federated workers + file locations
+fed1 = "localhost:8001/" + tempdir + "m.csv"
+fed2 = "localhost:8002/" + tempdir + "m.csv"
+fed3 = "localhost:8003/" + tempdir + "m.csv"
+
+fed1_file = tempdir+"m1.fed"
+fed2_file = tempdir+"m2.fed"
+fed3_file = tempdir+"m3.fed"
+
+
+class TestFederatedAggFn(unittest.TestCase):
+
+    sds: SystemDSContext = None
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+        cls.sds.federated([fed1], [
+            ([0, 0], [dim, dim])]).write(fed1_file, 
format="federated").compute()
+        cls.sds.federated([fed1, fed2], [
+            ([0, 0], [dim, dim]),
+            ([0, dim], [dim, dim*2])]).write(fed2_file, 
format="federated").compute()
+        cls.sds.federated([fed1, fed2, fed3],  [
+            ([0, 0], [dim, dim]),
+            ([0, dim], [dim, dim*2]),
+            ([0, dim*2], [dim, dim*3])]).write(fed3_file, 
format="federated").compute()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_verify_same_input(self):
+        f_m = self.sds.federated([fed1], [([0, 0], [dim, dim])]).compute()
+        self.assertTrue(np.allclose(f_m, m))
+
+    def test_verify_same_input_if_reading_fed(self):
+        f_m = self.sds.read(fed1_file).compute()
+        self.assertTrue(np.allclose(f_m, m))
+
+    def test_verify_same_input_if_reading_fed2(self):
+        f_m = self.sds.read(fed2_file).compute()
+        m2 = np.column_stack((m,m))
+        self.assertTrue(np.allclose(f_m, m2))
+
+    def test_verify_same_input_if_reading_fed3(self):
+        f_m = self.sds.read(fed3_file).compute()
+        m2 = np.column_stack((m,m))
+        m3 = np.column_stack((m,m2))
+        self.assertTrue(np.allclose(f_m, m3))
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)

Reply via email to