This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit bfc58f15aec8d65ab01e8488678c6eb9838f19d1 Author: TMaddox <[email protected]> AuthorDate: Fri May 7 22:14:03 2021 +0200 [SYSTEMDS-2828] Python Multi Return Continuation This commit adds support for python multi return continuation. It give the ability to continue processing variables that are returned from a function call that return more than one variable. --- .../python/systemds/context/systemds_context.py | 16 +++- src/main/python/systemds/operator/__init__.py | 3 +- src/main/python/systemds/operator/nodes/list.py | 88 +++++++++++++++++ .../python/systemds/operator/operation_node.py | 33 +------ src/main/python/systemds/script_building/dag.py | 41 ++++++-- src/main/python/systemds/script_building/script.py | 36 ++++--- src/main/python/tests/algorithms/test_pca.py | 5 +- src/main/python/tests/list/test_operations.py | 106 +++++++++++++++++++++ 8 files changed, 275 insertions(+), 53 deletions(-) diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py index f7f8199..80db697 100644 --- a/src/main/python/systemds/context/systemds_context.py +++ b/src/main/python/systemds/context/systemds_context.py @@ -38,7 +38,7 @@ import numpy as np import pandas as pd from py4j.java_gateway import GatewayParameters, JavaGateway from py4j.protocol import Py4JNetworkError -from systemds.operator import Frame, Matrix, OperationNode, Scalar, Source +from systemds.operator import Frame, Matrix, OperationNode, Scalar, Source, List from systemds.script_building import OutputType from systemds.utils.consts import VALID_INPUT_TYPES from systemds.utils.helpers import get_module_dir @@ -458,3 +458,17 @@ class SystemDSContext(object): :param print_imported_methods: boolean specifying if the imported methods should be printed. """ return Source(self, path, name, print_imported_methods) + + def list(self, *args: Sequence[VALID_INPUT_TYPES], **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'List': + if len(kwargs) != 0 and len(args) != 0: + raise Exception("Accepts either args or kwargs") + elif len(kwargs) != 0: + out = [] + for key, arg in kwargs.items(): + out.append((key, OutputType.from_type(arg))) + return List(self, 'list', named_input_nodes=kwargs, outputs=out) + elif len(args) != 0: + out = [] + for idx, arg in enumerate(args): + out.append((f"_{idx}", OutputType.from_type(arg))) + return List(self, 'list', unnamed_input_nodes=args, outputs=out) diff --git a/src/main/python/systemds/operator/__init__.py b/src/main/python/systemds/operator/__init__.py index fcecc9e..cda9ba2 100644 --- a/src/main/python/systemds/operator/__init__.py +++ b/src/main/python/systemds/operator/__init__.py @@ -24,6 +24,7 @@ from systemds.operator.nodes.scalar import Scalar from systemds.operator.nodes.matrix import Matrix from systemds.operator.nodes.frame import Frame from systemds.operator.nodes.source import Source +from systemds.operator.nodes.list import List from systemds.operator import algorithm -__all__ = [OperationNode, algorithm, Scalar, Matrix, Frame, Source] +__all__ = [OperationNode, algorithm, Scalar, List, Matrix, Frame, Source] diff --git a/src/main/python/systemds/operator/nodes/list.py b/src/main/python/systemds/operator/nodes/list.py new file mode 100644 index 0000000..64e37eb --- /dev/null +++ b/src/main/python/systemds/operator/nodes/list.py @@ -0,0 +1,88 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +__all__ = ["List"] + +from typing import Dict, Sequence, Tuple, Union, Iterable, List + +import numpy as np +from py4j.java_gateway import JavaObject + +from systemds.operator import OperationNode, Matrix +from systemds.script_building.dag import OutputType +from systemds.utils.consts import VALID_INPUT_TYPES +from systemds.utils.converters import numpy_to_matrix_block +from systemds.utils.helpers import create_params_string + + +class List(OperationNode): + + def __init__(self, sds_context: 'SystemDSContext', operation: str, + unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None, + named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, + outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)]): + + is_python_local_data = False + self._outputs = outputs + self._named_output_nodes = {} + for idx, output in enumerate(outputs): + if output[1] == OutputType.MATRIX: + self.named_output_nodes[output[0]] = Matrix(sds_context, operation='list', named_input_nodes={f"_{idx}": self}) + # TODO add output types + + super().__init__(sds_context, operation, unnamed_input_nodes, + named_input_nodes, OutputType.LIST, is_python_local_data) + + def __getitem__(self, key): + if isinstance(key, int): + return self.named_output_nodes[self._outputs[key][0]] + return self.named_output_nodes[key] + + def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None: + assert self.is_python_local_data, 'Can only pass data to prepared script if it is python local!' + if self._is_numpy(): + prepared_script.setMatrix(var_name, numpy_to_matrix_block( + sds, self._np_array), True) # True for reuse + + def __parse_output_result_list(self, result_variables): + result_var = [] + named_output_nodes_types_list = [type(named_output_node).__name__ for named_output_node in list(self.named_output_nodes.values())] + for idx, v in enumerate(self._script.out_var_name): + if named_output_nodes_types_list[idx] == "Matrix": + result_var.append(self.__parse_output_result_matrix(result_variables, v)) + elif named_output_nodes_types_list[idx] == "Frame": + result_var.append(self.__parse_output_result_frame(result_variables, v)) + else: + result_var.append(result_variables.getDouble(self._script.out_var_name[idx])) + return result_var + + def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], + named_input_vars: Dict[str, str]) -> str: + + inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars) + output = "[" + for idx, output_node in enumerate(self.named_output_nodes): + output += f'{var_name}_{idx},' + output = output[:-1] + "]" + return f'{output}={self.operation}({inputs_comma_sep});' + + def compute(self, verbose: bool = False, lineage: bool = False) -> Union[np.array]: + return super().compute(verbose, lineage) diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py index 3f98598..6dcd56c 100644 --- a/src/main/python/systemds/operator/operation_node.py +++ b/src/main/python/systemds/operator/operation_node.py @@ -50,9 +50,7 @@ class OperationNode(DAGNode): Iterable[VALID_INPUT_TYPES]] = None, named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, output_type: OutputType = OutputType.MATRIX, - is_python_local_data: bool = False, - number_of_outputs=1, - output_types: Iterable[OutputType] = None): + is_python_local_data: bool = False): """ Create general `OperationNode` @@ -81,10 +79,9 @@ class OperationNode(DAGNode): self._result_var = None self._lineage_trace = None self._script = None - self._number_of_outputs = number_of_outputs - self._output_types = output_types self._source_node = None self._already_added = False + self.dml_name = "" def compute(self, verbose: bool = False, lineage: bool = False) -> \ Union[float, np.array, Tuple[Union[float, np.array], str]]: @@ -138,21 +135,6 @@ class OperationNode(DAGNode): self.sds_context, result_variables.getFrameBlock(var_name) ) - def __parse_output_result_list(self, result_variables): - result_var = [] - for idx, v in enumerate(self._script.out_var_name): - if(self._output_types == None or self._output_types[idx] == OutputType.MATRIX): - result_var.append( - self.__parse_output_result_matrix(result_variables, v)) - elif self._output_types[idx] == OutputType.FRAME: - result_var.append( - self.__parse_output_result_frame(result_variables, v)) - - else: - result_var.append(result_variables.getDouble( - self._script.out_var_name[idx])) - return result_var - def get_lineage_trace(self) -> str: """Get the lineage trace for this node. @@ -174,16 +156,9 @@ class OperationNode(DAGNode): unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables' return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}' - inputs_comma_sep = create_params_string( - unnamed_input_vars, named_input_vars) + inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars) - if self.output_type == OutputType.LIST: - output = "[" - for idx in range(self._number_of_outputs): - output += f'{var_name}_{idx},' - output = output[:-1] + "]" - return f'{output}={self.operation}({inputs_comma_sep});' - elif self.output_type == OutputType.NONE: + if self.output_type == OutputType.NONE: return f'{self.operation}({inputs_comma_sep});' # elif self.output_type == OutputType.ASSIGN: # return f'{var_name}={self.operation};' diff --git a/src/main/python/systemds/script_building/dag.py b/src/main/python/systemds/script_building/dag.py index 3abdc74..2ca2e8f 100644 --- a/src/main/python/systemds/script_building/dag.py +++ b/src/main/python/systemds/script_building/dag.py @@ -24,6 +24,8 @@ from enum import Enum, auto from typing import TYPE_CHECKING, Any, Dict, Sequence, Union, Optional from py4j.java_gateway import JavaObject, JVMView + +import systemds.operator from systemds.utils.consts import VALID_INPUT_TYPES if TYPE_CHECKING: @@ -68,18 +70,37 @@ class OutputType(Enum): return OutputType.NONE + @staticmethod + def from_type(obj): + if obj is not None: + if isinstance(obj, systemds.operator.Matrix): + return OutputType.MATRIX + elif isinstance(obj, systemds.operator.Frame): + return OutputType.FRAME + elif isinstance(obj, systemds.operator.Scalar): + return OutputType.SCALAR + elif isinstance(obj, float): # TODO is this correct? + return OutputType.DOUBLE + elif isinstance(obj, str): + return OutputType.STRING + elif isinstance(obj, systemds.operator.List): + return OutputType.LIST + + return OutputType.NONE + class DAGNode(ABC): """A Node in the directed-acyclic-graph (DAG) defining all operations.""" sds_context: 'SystemDSContext' _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]] _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]] + _named_output_nodes: Dict[str, Union['DAGNode', str, int, float, bool]] _source_node: Optional["DAGNode"] _output_type: OutputType _script: Optional["DMLScript"] _is_python_local_data: bool - _number_of_outputs: int _already_added: bool + _dml_name: str def compute(self, verbose: bool = False, lineage: bool = False) -> Any: """Get result of this operation. Builds the dml script and executes it in SystemDS, before this method is called @@ -126,12 +147,12 @@ class DAGNode(ABC): return self._named_input_nodes @property - def is_python_local_data(self): - return self._is_python_local_data + def named_output_nodes(self): + return self._named_output_nodes @property - def number_of_outputs(self): - return self._number_of_outputs + def is_python_local_data(self): + return self._is_python_local_data @property def output_type(self): @@ -147,4 +168,12 @@ class DAGNode(ABC): @property def script_str(self): - return self._script.dml_script \ No newline at end of file + return self._script.dml_script + + @property + def dml_name(self): + return self._dml_name + + @dml_name.setter + def dml_name(self, value): + self._dml_name = value diff --git a/src/main/python/systemds/script_building/script.py b/src/main/python/systemds/script_building/script.py index eb145b0..3eed51c 100644 --- a/src/main/python/systemds/script_building/script.py +++ b/src/main/python/systemds/script_building/script.py @@ -158,10 +158,10 @@ class DMLScript: :param dag_root: the topmost operation of our DAG, result of operation will be output """ baseOutVarString = self._dfs_dag_nodes(dag_root) - if(dag_root.output_type != OutputType.NONE): - if(dag_root.number_of_outputs > 1): + if dag_root.output_type != OutputType.NONE: + if dag_root.output_type == OutputType.LIST: self.out_var_name = [] - for idx in range(dag_root.number_of_outputs): + for idx, output_node in enumerate(dag_root.named_output_nodes): self.add_code( f'write({baseOutVarString}_{idx}, \'./tmp_{idx}\');') self.out_var_name.append(f'{baseOutVarString}_{idx}') @@ -179,31 +179,37 @@ class DMLScript: if isinstance(dag_node, bool): return 'TRUE' if dag_node else 'FALSE' return str(dag_node) + + if dag_node.dml_name != "": + return dag_node.dml_name + if dag_node._output_type == OutputType.IMPORT: if not dag_node.already_added: self.add_code(dag_node.code_line(None, None)) return None + if dag_node._source_node is not None: self._dfs_dag_nodes(dag_node._source_node) # for each node do the dfs operation and save the variable names in `input_var_names` # get variable names of unnamed parameters - unnamed_input_vars = [self._dfs_dag_nodes( - input_node) for input_node in dag_node.unnamed_input_nodes] - - # get variable names of named parameters - named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, input_node in - dag_node.named_input_nodes.items()} + unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node in dag_node.unnamed_input_nodes] - curr_var_name = self._next_unique_var() + named_input_vars = {} + for name, input_node in dag_node.named_input_nodes.items(): + named_input_vars[name] = self._dfs_dag_nodes(input_node) + if isinstance(input_node, DAGNode) and input_node._output_type == OutputType.LIST: + dag_node.dml_name = named_input_vars[name] + name + return dag_node.dml_name + + dag_node.dml_name = self._next_unique_var() if dag_node.is_python_local_data: - self.add_input_from_python(curr_var_name, dag_node) - - code_line = dag_node.code_line( - curr_var_name, unnamed_input_vars, named_input_vars) + self.add_input_from_python(dag_node.dml_name, dag_node) + + code_line = dag_node.code_line(dag_node.dml_name, unnamed_input_vars, named_input_vars) self.add_code(code_line) - return curr_var_name + return dag_node.dml_name def _next_unique_var(self) -> str: """Gets the next unique variable name diff --git a/src/main/python/tests/algorithms/test_pca.py b/src/main/python/tests/algorithms/test_pca.py index bf5bb8c..0f774c0 100644 --- a/src/main/python/tests/algorithms/test_pca.py +++ b/src/main/python/tests/algorithms/test_pca.py @@ -25,6 +25,9 @@ import numpy as np from systemds.context import SystemDSContext from systemds.operator.algorithm import pca +from systemds.operator import List +from systemds.script_building.dag import OutputType + class TestPCA(unittest.TestCase): @@ -48,7 +51,7 @@ class TestPCA(unittest.TestCase): m1 = self.generate_matrices_for_pca(30, seed=1304) X = self.sds.from_numpy( m1) # print(features) - [res, model, _, _ ] = pca(X, K=1, scale="FALSE", center="FALSE").compute() + [res, model, _, _] = pca(X, K=1, scale="FALSE", center="FALSE").compute() for (x, y) in zip(m1, res): self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0)) diff --git a/src/main/python/tests/list/test_operations.py b/src/main/python/tests/list/test_operations.py new file mode 100644 index 0000000..818042f --- /dev/null +++ b/src/main/python/tests/list/test_operations.py @@ -0,0 +1,106 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest + +import numpy as np +from systemds.context import SystemDSContext +from systemds.operator.algorithm import pca + +from systemds.operator import List +from systemds.script_building.dag import OutputType + + +class TestListOperations(unittest.TestCase): + + sds: SystemDSContext = None + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def test_creation(self): + """ + Tests the creation of a List object via the SystemDSContext + """ + m1 = self.sds.from_numpy(np.array([1, 2, 3])) + m2 = self.sds.from_numpy(np.array([4, 5, 6])) + list_obj = self.sds.list(m1, m2) + tmp = list_obj[0] + list_obj[1] + res = tmp.compute() + self.assertTrue(np.allclose(m2, res)) + + def test_addition(self): + """ + Tests the creation of a List object via the SystemDSContext and adds a value + """ + m1 = self.sds.from_numpy(np.array([1, 2, 3])) + m2 = self.sds.from_numpy(np.array([4, 5, 6])) + list_obj = self.sds.list(m1, m2) + tmp = list_obj[0] + 2 + res = tmp.compute() + self.assertTrue(np.allclose(m2 + 2, res)) + + def test_500x2b(self): + """ + The purpose of this test is to show that an operation can be performed on the output of a multi output list node, + without the need of calculating the result first. + """ + m1 = self.generate_matrices_for_pca(30, seed=1304) + node0 = self.sds.from_numpy(m1) + # print(features) + node1 = List(node0.sds_context, 'pca', named_input_nodes={"X": node0, "K": 1, "scale": "FALSE", "center": "FALSE"}, + outputs=[("res", OutputType.MATRIX), ("model", OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", OutputType.MATRIX)]) + node2 = node1["res"].abs() + res = node2.compute(verbose=False) + + def test_multiple_outputs(self): + """ + The purpose of this test is to show that we can use multiple outputs + of a single list node in the DAG in one script + """ + node0 = self.sds.from_numpy(np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])) + node1 = self.sds.from_numpy(np.array([10, 20, 30, 40, 50, 60, 70, 80, 90])) + params_dict = {'X': node0, 'Y': node1} + node2 = List(self.sds, 'split', named_input_nodes=params_dict, + outputs=[("X_train", OutputType.MATRIX), ("X_test", OutputType.MATRIX), ("Y_train", OutputType.MATRIX), ("Y_test", OutputType.MATRIX)]) + node3 = node2["X_train"] + node2["Y_train"] + # X_train and Y_train are of the same shape because node0 and node1 have both only one dimension. + # Therefore they can be added together + res = node3.compute(verbose=False) + + def generate_matrices_for_pca(self, dims: int, seed: int = 1234): + np.random.seed(seed) + + mu, sigma = 0, 0.1 + s = np.random.normal(mu, sigma, dims) + + m1 = np.array(np.c_[np.copy(s) * 1, np.copy(s)*0.3], dtype=np.double) + + return m1 + + +if __name__ == "__main__": + unittest.main(exit=False)
