This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit de50269f820b570841d5a79c60161b41c5b59db5 Author: baunsgaard <[email protected]> AuthorDate: Fri Sep 25 19:42:46 2020 +0200 [SYSTEMDS-2677] Allow Reading unknown dimensions for algorithm input --- .../python/systemds/context/systemds_context.py | 22 ++++++++++++++++---- src/main/python/systemds/operator/algorithm.py | 8 ++++++-- .../python/systemds/operator/operation_node.py | 19 +++++++++++++++-- src/main/python/systemds/script_building/dag.py | 1 + .../python/tests/examples/tutorials/test_mnist.py | 24 ++++++++++++++++++++++ 5 files changed, 66 insertions(+), 8 deletions(-) diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py index d80fdfa..5160c2b 100644 --- a/src/main/python/systemds/context/systemds_context.py +++ b/src/main/python/systemds/context/systemds_context.py @@ -39,6 +39,7 @@ from systemds.utils.helpers import get_module_dir from systemds.operator import OperationNode from systemds.script_building import OutputType + class SystemDSContext(object): """A context with a connection to a java instance with which SystemDS operations are executed. The java process is started and is running using a random tcp port for instruction parsing.""" @@ -274,8 +275,21 @@ class SystemDSContext(object): return OperationNode(self, 'rand', [], named_input_nodes=named_input_nodes) - def read(self, path: os.PathLike, **kwargs: Dict[str, VALID_INPUT_TYPES]): - return OperationNode(self, 'read', [f'"{path}"'], named_input_nodes=kwargs) + def read(self, path: os.PathLike, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode': + """ Read an file from disk. Supportted types include: + CSV, Matrix Market(coordinate), Text(i,j,v), SystemDS Binay + See: http://apache.github.io/systemds/site/dml-language-reference#readwrite-built-in-functions for more details + :return: an Operation Node, containing the read data. + """ + return OperationNode(self, 'read', [f'"{path}"'], named_input_nodes=kwargs, shape=(-1,)) - def scalar(self, v: Dict[str, VALID_INPUT_TYPES]): - return OperationNode(self, v, output_type=OutputType.SCALAR) \ No newline at end of file + def scalar(self, v: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode': + """ Construct an scalar value, this can contain str, float, double, integers and booleans. + :return: An `OperationNode` containing the scalar value. + """ + if type(v) is str: + if not ((v[0] == '"' and v[-1] == '"') or (v[0] == "'" and v[-1] == "'")): + v = f'"{v}"' + # output type assign simply assigns the given variable to the value + # therefore the output type is assign. + return OperationNode(self, v, output_type=OutputType.ASSIGN) diff --git a/src/main/python/systemds/operator/algorithm.py b/src/main/python/systemds/operator/algorithm.py index 2af261f..b29caf1 100644 --- a/src/main/python/systemds/operator/algorithm.py +++ b/src/main/python/systemds/operator/algorithm.py @@ -156,10 +156,14 @@ def multiLogReg(x: OperationNode, y: OperationNode, **kwargs: Dict[str, VALID_IN if y.shape[0] == 0: raise ValueError("Found array with 0 feature(s) (shape={s}) while a minimum of 1 is required." .format(s=y.shape)) - + if -1 in x.shape: + output_shape = (-1,) + else: + output_shape = (x.shape[1],) + params_dict = {'X': x, 'Y': y} params_dict.update(kwargs) - return OperationNode(x.sds_context, 'multiLogReg', named_input_nodes=params_dict, shape = (x.shape[1],)) + return OperationNode(x.sds_context, 'multiLogReg', named_input_nodes=params_dict, shape = output_shape) def multiLogRegPredict(x: OperationNode, b: OperationNode, y: OperationNode, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> OperationNode: diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py index ebe5804..4d01a8b 100644 --- a/src/main/python/systemds/operator/operation_node.py +++ b/src/main/python/systemds/operator/operation_node.py @@ -161,7 +161,7 @@ class OperationNode(DAGNode): return f'{output}={self.operation}({inputs_comma_sep});' elif self.output_type == OutputType.NONE: return f'{self.operation}({inputs_comma_sep});' - elif self.output_type == OutputType.SCALAR: + elif self.output_type == OutputType.ASSIGN: return f'{var_name}={self.operation};' else: return f'{var_name}={self.operation}({inputs_comma_sep});' @@ -341,15 +341,30 @@ class OperationNode(DAGNode): return OperationNode(self.sds_context, 'moment', unnamed_inputs, output_type=OutputType.DOUBLE) def write(self, destination: str, format:str = "binary", **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode': + """ Write input to disk. + The written format is easily read by SystemDSContext.read(). + There is no return on write. + + :param destination: The location which the file is stored. Defaulting to HDFS paths if available. + :param format: The format which the file is saved in. Default is binary to improve SystemDS reading times. + :param kwargs: Contains multiple extra specific arguments, can be seen at http://apache.github.io/systemds/site/dml-language-reference#readwrite-built-in-functions + """ unnamed_inputs = [self, f'"{destination}"'] named_parameters = {"format":f'"{format}"'} named_parameters.update(kwargs) return OperationNode(self.sds_context, 'write', unnamed_inputs, named_parameters, output_type= OutputType.NONE) def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode': - return OperationNode(self.sds_context, 'toString', [self], kwargs, output_type= OutputType.DOUBLE) + """ Converts the input to a string representation. + :return: `OperationNode` containing the string. + """ + return OperationNode(self.sds_context, 'toString', [self], kwargs, output_type= OutputType.SCALAR) def print(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode': + """ Prints the given Operation Node. + There is no return on calling. + To get the returned string look at the stdout of SystemDSContext. + """ return OperationNode(self.sds_context, 'print', [self], kwargs, output_type= OutputType.NONE) def rev(self) -> 'OperationNode': diff --git a/src/main/python/systemds/script_building/dag.py b/src/main/python/systemds/script_building/dag.py index 9e027e6..fac8d17 100644 --- a/src/main/python/systemds/script_building/dag.py +++ b/src/main/python/systemds/script_building/dag.py @@ -34,6 +34,7 @@ class OutputType(Enum): MATRIX = auto() DOUBLE = auto() SCALAR = auto() + ASSIGN = auto() LIST = auto() NONE = auto() diff --git a/src/main/python/tests/examples/tutorials/test_mnist.py b/src/main/python/tests/examples/tutorials/test_mnist.py index c4e9258..9843f4d 100644 --- a/src/main/python/tests/examples/tutorials/test_mnist.py +++ b/src/main/python/tests/examples/tutorials/test_mnist.py @@ -36,6 +36,7 @@ class Test_DMLScript(unittest.TestCase): sds: SystemDSContext = None d: DataManager = None + base_path = "systemds/examples/tutorials/mnist/" @classmethod def setUpClass(cls): @@ -84,6 +85,29 @@ class Test_DMLScript(unittest.TestCase): self.assertGreater(acc, 80) + def test_multi_log_reg_with_read(self): + train_count = 100 + test_count = 100 + X = Matrix(self.sds, self.d.get_train_data().reshape( + (60000, 28*28))[:train_count]) + X.write(self.base_path + "train_data").compute() + Y = Matrix(self.sds, self.d.get_train_labels()[:train_count]) + 1 + Y.write(self.base_path + "train_labels").compute() + + Xr = self.sds.read(self.base_path + "train_data") + Yr = self.sds.read(self.base_path + "train_labels") + + bias = multiLogReg(Xr, Yr, verbose=False) + # Test data + Xt = Matrix(self.sds, self.d.get_test_data().reshape( + (10000, 28*28))[:test_count]) + Yt = Matrix(self.sds, self.d.get_test_labels()[:test_count]) + Yt = Yt + 1.0 + + [_, _, acc] = multiLogRegPredict(Xt, bias, Yt).compute(verbose=True) + + self.assertGreater(acc, 70) + if __name__ == "__main__": unittest.main(exit=False)
