This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 9fe84b53060c99c8298b0fe8e5262e606b5cdfe3 Author: Kevin Innerebner <[email protected]> AuthorDate: Wed Jul 20 15:12:34 2022 +0200 [SYSTEMDS-2835] Add a federated version of adult_neural and mnist tests - Fixes a bug where instructions were not replaced by FED equivalent instructions, because the correct `CompilerConfig` option was not set. - Remove unnecessary CompilerConfigs for Python API - Add a federated version of adult_neural and mnist tests - Minor fix for the adult example testcase. We adapt to the changed label data (from series to dataframe). Closes #1668 --- .../java/org/apache/sysds/api/jmlc/Connection.java | 1 + .../controlprogram/caching/CacheableData.java | 8 +- .../cp/ParamservBuiltinCPInstruction.java | 1 + .../instructions/fed/InitFEDInstruction.java | 4 + .../python/systemds/context/systemds_context.py | 72 +++++++- .../python/systemds/examples/tutorials/adult.py | 4 +- .../python/systemds/operator/operation_node.py | 2 + .../python/tests/examples/tutorials/test_adult.py | 4 +- .../tests/federated/test_federated_adult_neural.py | 201 +++++++++++++++++++++ .../python/tests/federated/test_federated_mnist.py | 123 +++++++++++++ 10 files changed, 413 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/sysds/api/jmlc/Connection.java b/src/main/java/org/apache/sysds/api/jmlc/Connection.java index 7037e2172b..64cb504360 100644 --- a/src/main/java/org/apache/sysds/api/jmlc/Connection.java +++ b/src/main/java/org/apache/sysds/api/jmlc/Connection.java @@ -150,6 +150,7 @@ public class Connection implements Closeable _cconf.set(ConfigType.IGNORE_TEMPORARY_FILENAMES, true); _cconf.set(ConfigType.REJECT_READ_WRITE_UNKNOWNS, false); _cconf.set(ConfigType.ALLOW_CSE_PERSISTENT_READS, false); + _cconf.set(ConfigType.ALLOW_INDIVIDUAL_SB_SPECIFIC_OPS, false); //disable caching globally CacheableData.disableCaching(); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java index 8cb108d478..6dd726db6f 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java @@ -392,8 +392,12 @@ public abstract class CacheableData<T extends CacheBlock> extends Data if(_fedMapping == null && _metaData instanceof MetaDataFormat){ MetaDataFormat mdf = (MetaDataFormat) _metaData; if(mdf.getFileFormat() == FileFormat.FEDERATED){ - InitFEDInstruction.federateMatrix( - this, ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics())); + if (this instanceof FrameObject) + InitFEDInstruction.federateFrame((FrameObject) this, + ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics())); + else + InitFEDInstruction.federateMatrix( + this, ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics())); return true; } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java index ef45a9c2b3..b4c3c64553 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java @@ -87,6 +87,7 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc @Override public void processInstruction(ExecutionContext ec) { // check if the input is federated + // FIXME: does not work if features are federated, but labels are not if(ec.getMatrixObject(getParam(PS_FEATURES)).isFederated() || ec.getMatrixObject(getParam(PS_LABELS)).isFederated()) { runFederated(ec); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java index 3e648bbe3b..68965db4c3 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java @@ -451,6 +451,10 @@ public class InitFEDInstruction extends FEDInstruction implements LineageTraceab LOG.debug("Fed map Inited:" + output.getFedMapping()); } + public static void federateFrame(FrameObject output, List<Pair<FederatedRange, FederatedData>> workers) { + federateFrame(output, workers, null); + } + public static void federateFrame(FrameObject output, List<Pair<FederatedRange, FederatedData>> workers, CacheBlock[] blocks) { List<Pair<FederatedRange, FederatedData>> fedMapping = new ArrayList<>(); for(Pair<FederatedRange, FederatedData> e : workers) diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py index d25805cf96..e467a4d65a 100644 --- a/src/main/python/systemds/context/systemds_context.py +++ b/src/main/python/systemds/context/systemds_context.py @@ -31,6 +31,7 @@ from subprocess import PIPE, Popen from threading import Thread from time import sleep from typing import Dict, Iterable, Sequence, Tuple, Union +from contextlib import contextmanager import numpy as np import pandas as pd @@ -51,8 +52,10 @@ class SystemDSContext(object): """ java_gateway: JavaGateway + _capture_statistics: bool + _statistics: str - def __init__(self, port: int = -1): + def __init__(self, port: int = -1, capture_statistics: bool = False): """Starts a new instance of SystemDSContext, in which the connection to a JVM systemds instance is handled Any new instance of this SystemDS Context, would start a separate new JVM. @@ -61,6 +64,8 @@ class SystemDSContext(object): """ actual_port = self.__start(port) process = self.__process + self._statistics = "" + self._capture_statistics = capture_statistics if process.poll() is None: self.__start_gateway(actual_port) else: @@ -306,6 +311,71 @@ class SystemDSContext(object): s.close() return port + def _execution_completed(self, script: 'DMLScript'): + """ + Should/will be called after execution of a script. + Used to update statistics. + :param script: The script that got executed + """ + if self._capture_statistics: + self._statistics += script.prepared_script.statistics() + + def capture_stats(self, enable: bool = True): + """ + Enable (or disable) capturing of execution statistics. + :param enable: if `True` enable capturing, else disable it + """ + self._capture_statistics = enable + self.java_gateway.entry_point.getConnection().setStatistics(enable) + + @contextmanager + def capture_stats_context(self): + """ + Context for capturing statistics. Should be used in a `with` statement. + Afterwards capturing will be reset to the state it was before. + + Example: + ```Python + with sds.capture_stats_context(): + a = some_computation.compute() + b = another_computation.compute() + print(sds.take_stats()) + ``` + :return: a context object to be used in a `with` statement + """ + was_enabled = self._capture_statistics + try: + self.capture_stats(True) + yield None + finally: + self.capture_stats(was_enabled) + + def get_stats(self): + """ + Get the captured statistics. Will not clear the captured statistics. + + See `take_stats()` for an option that also clears the captured statistics. + :return: The captured statistics + """ + return self._statistics + + def take_stats(self): + """ + Get the captured statistics and clear the captured statistics. + + See `get_stats()` for an option that does not clear the captured statistics. + :return: The captured statistics + """ + stats = self.get_stats() + self.clear_stats() + return stats + + def clear_stats(self): + """ + Clears the captured statistics. + """ + self._statistics = "" + def full(self, shape: Tuple[int, int], value: Union[float, int]) -> 'Matrix': """Generates a matrix completely filled with a value diff --git a/src/main/python/systemds/examples/tutorials/adult.py b/src/main/python/systemds/examples/tutorials/adult.py index f15ed382e9..575697ae01 100644 --- a/src/main/python/systemds/examples/tutorials/adult.py +++ b/src/main/python/systemds/examples/tutorials/adult.py @@ -63,7 +63,7 @@ class DataManager: def get_train_labels_pandas(self) -> pd.DataFrame: self._get_data(self._train_data_loc) - return self._parse_data(self._train_data_loc)["income"] + return self._parse_data(self._train_data_loc)[["income"]] def get_train_labels(self, sds: SystemDSContext) -> 'Frame': self._get_data(self._train_data_loc) @@ -80,7 +80,7 @@ class DataManager: def get_test_labels_pandas(self) -> pd.DataFrame: self._get_data(self._test_data_loc) - return self._parse_data(self._test_data_loc)["income"] + return self._parse_data(self._test_data_loc)[["income"]] def get_test_labels(self, sds: SystemDSContext) -> 'Frame': self._get_data(self._test_data_loc) diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py index 99d823bc14..5db90b3a9f 100644 --- a/src/main/python/systemds/operator/operation_node.py +++ b/src/main/python/systemds/operator/operation_node.py @@ -100,6 +100,8 @@ class OperationNode(DAGNode): else: result_variables = self._script.execute() + self.sds_context._execution_completed(self._script) + if result_variables is not None: self._result_var = self._parse_output_result_variables( result_variables) diff --git a/src/main/python/tests/examples/tutorials/test_adult.py b/src/main/python/tests/examples/tutorials/test_adult.py index d327676977..c6b1018658 100644 --- a/src/main/python/tests/examples/tutorials/test_adult.py +++ b/src/main/python/tests/examples/tutorials/test_adult.py @@ -58,7 +58,7 @@ class TestAdultStandardML(unittest.TestCase): def test_train_labels(self): y = self.d.get_train_labels_pandas() - self.assertEqual((32561,), y.shape) + self.assertEqual((32561, 1), y.shape) def test_test_data(self): x_l = self.d.get_test_data_pandas() @@ -66,7 +66,7 @@ class TestAdultStandardML(unittest.TestCase): def test_test_labels(self): y_l = self.d.get_test_labels_pandas() - self.assertEqual((16281,), y_l.shape) + self.assertEqual((16281, 1), y_l.shape) def test_train_data_pandas_vs_systemds(self): pandas = self.d.get_train_data_pandas() diff --git a/src/main/python/tests/federated/test_federated_adult_neural.py b/src/main/python/tests/federated/test_federated_adult_neural.py new file mode 100644 index 0000000000..df65565c0b --- /dev/null +++ b/src/main/python/tests/federated/test_federated_adult_neural.py @@ -0,0 +1,201 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- +import itertools +import shutil +import unittest +import io +import json +import pandas as pd +from os import path, makedirs + +from systemds.context import SystemDSContext +from systemds.examples.tutorials.adult import DataManager +from systemds.operator.algorithm.builtin.scale import scale +from systemds.operator.algorithm.builtin.scaleApply import scaleApply + + +def create_schema(dataset): + schema = [] + for dtype in dataset.dtypes: + if pd.api.types.is_integer_dtype(dtype): + schema.append('int64') + elif pd.api.types.is_float_dtype(dtype): + schema.append('fp64') + elif pd.api.types.is_bool_dtype(dtype): + schema.append('bool') + else: + schema.append('string') + return ','.join(schema) + + +def create_row_federated_dataset(name, dataset, num_parts=2, + federated_workers=None): + if federated_workers is None: + federated_workers = ["localhost:8001", "localhost:8002"] + tempdir = "./tests/federated/tmp/test_federated_adult_neural/" + federated_file = path.join(tempdir, f"{name}.fed") + makedirs(tempdir, exist_ok=True) + + schema = create_schema(dataset) + r = dataset.shape[0] // num_parts + rs = [r for _ in range(num_parts - 1)] + [dataset.shape[0] - r * (num_parts - 1)] + c = dataset.shape[1] + + fed_file_content = [] + rows_processed = 0 + for worker_id, address, rows in zip(range(num_parts), itertools.cycle(federated_workers), rs): + dataset_part_path = path.join(tempdir, f"{name}{worker_id}.csv") + mtd = {"format": "csv", "header": True, "rows": rows, "cols": c, + "data_type": "frame", "schema": schema} + + dataset_part = dataset[rows_processed:rows_processed + rows] + dataset_part.to_csv(dataset_part_path, index=False) + with io.open(f"{dataset_part_path}.mtd", "w", encoding="utf-8") as f: + json.dump(mtd, f, ensure_ascii=False) + + fed_file_content.append({ + "address": address, + "dataType": "FRAME", + "filepath": dataset_part_path, + "begin": [rows_processed, 0], + "end": [rows_processed + rows, c], + }) + rows_processed += rows + + with open(federated_file, "w", encoding="utf-8") as f: + json.dump(fed_file_content, f) + with open(federated_file + '.mtd', "w", encoding="utf-8") as f: + json.dump({"format": "federated", "rows": dataset.shape[0], "cols": c, + "data_type": "frame", "schema": schema}, f) + + return federated_file + + +class TestFederatedAdultNeural(unittest.TestCase): + """ + Test class for adult neural network code + """ + + sds: SystemDSContext = None + d: DataManager = None + neural_net_src_path: str = "tests/examples/tutorials/neural_net_source.dml" + preprocess_src_path: str = "tests/examples/tutorials/preprocess.dml" + data_path_train: str = "" + data_path_test: str = "" + labels_path_train: str = "" + labels_path_test: str = "" + dataset_jspec: str = "../../test/resources/datasets/adult/jspec.json" + + train_count: int = 15000 + test_count: int = 300 + + network_dir: str = "tests/examples/tutorials/model" + network: str = network_dir + "/fnn" + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + cls.d = DataManager() + cls.data_path_train = create_row_federated_dataset("train_data", + cls.d.get_train_data_pandas()[0:cls.train_count]) + cls.labels_path_train = create_row_federated_dataset("train_labels", + cls.d.get_train_labels_pandas()[0:cls.train_count]) + cls.data_path_test = create_row_federated_dataset("test_data", + cls.d.get_test_data_pandas()[0:cls.test_count]) + cls.labels_path_test = create_row_federated_dataset("test_labels", + cls.d.get_test_labels_pandas()[0:cls.test_count]) + shutil.rmtree(cls.network_dir, ignore_errors=True) + + @classmethod + def tearDownClass(cls): + cls.sds.close() + shutil.rmtree(cls.network_dir, ignore_errors=True) + + # Tests + + @unittest.skip("`toOneHot()` won't be federated -> param-server won't work") + def test_train_neural_net(self): + self.train_neural_net_and_save() + self.eval_neural_net() + + @unittest.skip("`toOneHot()` won't be federated -> param-server won't work") + def test_train_predict(self): + self.train_neural_net_and_predict() + + # Helper methods + + def prepare_x(self): + jspec = self.d.get_jspec(self.sds) + train_x_frame = self.sds.read(self.data_path_train) + train_x, M1 = train_x_frame.transform_encode(spec=jspec) + test_x_frame = self.sds.read(self.data_path_test) + test_x = test_x_frame.transform_apply(spec=jspec, meta=M1) + # Scale and shift .... not needed because of sigmoid layer, + # could be useful therefore tested. + [train_x, ce, sc] = scale(train_x) + test_x = scaleApply(test_x, ce, sc) + return [train_x, test_x] + + def prepare_y(self): + jspec_dict = {"recode": ["income"]} + jspec_labels = self.sds.scalar(f'"{jspec_dict}"') + train_y_frame = self.sds.read(self.labels_path_train) + train_y, M2 = train_y_frame.transform_encode(spec=jspec_labels) + test_y_frame = self.sds.read(self.labels_path_test) + test_y = test_y_frame.transform_apply(spec=jspec_labels, meta=M2) + labels = 2 + train_y = train_y.to_one_hot(labels) + test_y = test_y.to_one_hot(labels) + return [train_y, test_y] + + def prepare(self): + x = self.prepare_x() + y = self.prepare_y() + return [x[0], x[1], y[0], y[1]] + + def train_neural_net_and_save(self): + [train_x, _, train_y, _] = self.prepare() + FFN_package = self.sds.source(self.neural_net_src_path, "fnn") + network = FFN_package.train(train_x, train_y, 4, 16, 0.01, 1) + network.write(self.network).compute() + + def train_neural_net_and_predict(self): + [train_x, test_x, train_y, test_y] = self.prepare() + FFN_package = self.sds.source(self.neural_net_src_path, "fnn") + network = FFN_package.train_paramserv( + train_x, train_y, 1, 16, 0.01, 2, 1) + probs = FFN_package.predict(test_x, network) + accuracy = FFN_package.eval(probs, test_y).compute() + # accuracy is returned in percent + self.assertTrue(accuracy > 0.80) + + def eval_neural_net(self): + [_, test_x, _, test_y] = self.prepare() + network = self.sds.read(self.network) + FFN_package = self.sds.source(self.neural_net_src_path, "fnn") + probs = FFN_package.predict(test_x, network) + accuracy = FFN_package.eval(probs, test_y).compute() + # accuracy is returned in percent + self.assertTrue(accuracy > 0.80) + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/main/python/tests/federated/test_federated_mnist.py b/src/main/python/tests/federated/test_federated_mnist.py new file mode 100644 index 0000000000..3b11bd3194 --- /dev/null +++ b/src/main/python/tests/federated/test_federated_mnist.py @@ -0,0 +1,123 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest +import itertools +import io +import json +import pandas as pd +from os import path, makedirs + +from systemds.context import SystemDSContext +from systemds.examples.tutorials.mnist import DataManager +from systemds.operator.algorithm import kmeans, multiLogReg, multiLogRegPredict + + +def create_row_federated_dataset(name, dataset, num_parts=2, + federated_workers=None): + if federated_workers is None: + federated_workers = ["localhost:8001", "localhost:8002"] + tempdir = "./tests/federated/tmp/test_federated_mnist/" + federated_file = path.join(tempdir, f"{name}.fed") + makedirs(tempdir, exist_ok=True) + + r = dataset.shape[0] // num_parts + rs = [r for _ in range(num_parts - 1)] + [dataset.shape[0] - r * (num_parts - 1)] + c = dataset.shape[1] + + fed_file_content = [] + rows_processed = 0 + for worker_id, address, rows in zip(range(num_parts), itertools.cycle(federated_workers), rs): + dataset_part_path = path.join(tempdir, f"{name}{worker_id}.csv") + mtd = {"format": "csv", "rows": rows, "cols": c, + "data_type": "matrix", "value_type": "double"} + + dataset_part = dataset[rows_processed:rows_processed + rows] + pd.DataFrame(dataset_part).to_csv(dataset_part_path, index=False, header=False) + with io.open(f"{dataset_part_path}.mtd", "w", encoding="utf-8") as f: + json.dump(mtd, f, ensure_ascii=False) + + fed_file_content.append({ + "address": address, + "dataType": "MATRIX", + "filepath": dataset_part_path, + "begin": [rows_processed, 0], + "end": [rows_processed + rows, c], + }) + rows_processed += rows + + with open(federated_file, "w", encoding="utf-8") as f: + json.dump(fed_file_content, f) + with open(federated_file + '.mtd', "w", encoding="utf-8") as f: + json.dump({"format": "federated", "rows": dataset.shape[0], "cols": c, + "data_type": "matrix", "value_type": "double"}, f) + + return federated_file + + +class TestFederatedMnist(unittest.TestCase): + """ + Test class for mnist dml script tutorial code. + """ + + sds: SystemDSContext = None + d: DataManager = None + base_path = "systemds/examples/tutorials/mnist/" + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + cls.d = DataManager() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def test_multi_log_reg(self): + # Reduced because we want the tests to finish a bit faster. + train_count = 15000 + test_count = 5000 + # Train data + np_train_data = self.d.get_train_data().reshape(60000, 28 * 28)[0:train_count] + data_path_train = create_row_federated_dataset("train_data", np_train_data) + X = self.sds.read(data_path_train) + Y = self.sds.from_numpy(self.d.get_train_labels()[:train_count]) + Y = Y + 1.0 + + # Test data + np_test_data = self.d.get_test_data().reshape(10000, 28 * 28)[0:test_count] + data_path_test = create_row_federated_dataset("test_data", np_test_data) + Xt = self.sds.read(data_path_test) + Yt = self.sds.from_numpy(self.d.get_test_labels()[:test_count]) + Yt = Yt + 1.0 + + bias = multiLogReg(X, Y) + + with self.sds.capture_stats_context(): + [_, _, acc] = multiLogRegPredict(Xt, bias, Yt).compute() + stats = self.sds.take_stats() + for fed_instr in ["fed_isnan", "fed_*", "fed_-", "fed_uark+", "fed_r'", "fed_rightIndex"]: + self.assertIn(fed_instr, stats) + self.assertGreater(acc, 80) + + +if __name__ == "__main__": + unittest.main(exit=False)
