This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 3d6190b2ac7ae82b47166736c3ba46c17677dcfa Author: Bernhard Leder <[email protected]> AuthorDate: Thu Apr 22 17:15:12 2021 +0200 [SYSTEMDS-2882] Python Frame Support This commit adds frame support in our python API. - Add support for Pandas to Frame - Add support for reading Frames from python API - Add support for transform encode - Add support for transform decode Closes #1239 --- .github/workflows/federatedPython.yml | 2 +- .github/workflows/python.yml | 2 +- src/main/python/setup.py | 3 +- .../python/systemds/context/systemds_context.py | 25 +++- src/main/python/systemds/frame/__init__.py | 24 ++++ src/main/python/systemds/frame/frame.py | 128 ++++++++++++++++++++ .../python/systemds/operator/algorithm/__init__.py | 3 - .../python/systemds/operator/operation_node.py | 100 ++++++++++++---- src/main/python/systemds/utils/converters.py | 75 +++++++++++- src/main/python/tests/frame/data/homes.csv | 21 ++++ .../python/tests/frame/data/homes.tfspec_bin2.json | 6 + .../tests/frame/data/homes.tfspec_recode2.json | 3 + src/main/python/tests/frame/test_hyperband.py | 86 ++++++++++++++ src/main/python/tests/frame/test_r_c_bind.py | 130 +++++++++++++++++++++ .../python/tests/frame/test_transform_apply.py | 86 ++++++++++++++ .../python/tests/frame/test_transform_encode.py | 75 ++++++++++++ src/main/python/tests/frame/test_write_read.py | 74 ++++++++++++ 17 files changed, 811 insertions(+), 32 deletions(-) diff --git a/.github/workflows/federatedPython.yml b/.github/workflows/federatedPython.yml index 2c4dcdb..ed192d0 100644 --- a/.github/workflows/federatedPython.yml +++ b/.github/workflows/federatedPython.yml @@ -74,7 +74,7 @@ jobs: ${{ runner.os }}-pip-${{ matrix.python-version }}- - name: Install pip Dependencies - run: pip install numpy py4j wheel + run: pip install numpy py4j wheel pandas - name: Build Python Package run: | diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 9947135..a34a757 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -85,7 +85,7 @@ jobs: run: sudo apt-get install protobuf-compiler libprotoc-dev - name: Install pip Dependencies - run: pip install numpy py4j wheel scipy sklearn requests + run: pip install numpy py4j wheel scipy sklearn requests pandas - name: Build Python Package run: | diff --git a/src/main/python/setup.py b/src/main/python/setup.py index 5939101..f79d220 100755 --- a/src/main/python/setup.py +++ b/src/main/python/setup.py @@ -38,7 +38,8 @@ ARTIFACT_VERSION_SHORT = ARTIFACT_VERSION.split("-")[0] REQUIRED_PACKAGES = [ 'numpy >= 1.8.2', 'py4j >= 0.10.9', - 'requests >= 2.24.0' + 'requests >= 2.24.0', + 'pandas >= 1.2.2' ] python_dir = 'systemds' diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py index 107d875..f274a25 100644 --- a/src/main/python/systemds/context/systemds_context.py +++ b/src/main/python/systemds/context/systemds_context.py @@ -40,6 +40,8 @@ from systemds.script_building import OutputType from systemds.utils.consts import VALID_INPUT_TYPES from systemds.utils.helpers import get_module_dir +from systemds.frame import Frame + class SystemDSContext(object): """A context with a connection to a java instance with which SystemDS operations are executed. @@ -299,7 +301,28 @@ class SystemDSContext(object): See: http://apache.github.io/systemds/site/dml-language-reference#readwrite-built-in-functions for more details :return: an Operation Node, containing the read data. """ - return OperationNode(self, 'read', [f'"{path}"'], named_input_nodes=kwargs, shape=(-1,)) + data_type = kwargs.get("data_type", None) + file_format = kwargs.get("format", None) + if data_type == "frame": + kwargs["data_type"] = f'"{data_type}"' + if isinstance(file_format, str): + kwargs["format"] = f'"{kwargs["format"]}"' + return Frame(self, None, f'"{path}"', **kwargs) + elif data_type == "scalar": + kwargs["data_type"] = f'"{data_type}"' + value_type = kwargs.get("value_type", None) + if value_type == "string": + kwargs["value_type"] = f'"{kwargs["value_type"]}"' + return OperationNode( + self, + "read", + [f'"{path}"'], + named_input_nodes=kwargs, + shape=(-1,), + output_type=OutputType.SCALAR, + ) + return OperationNode(self, "read", [f'"{path}"'], + named_input_nodes=kwargs, shape=(-1,)) def scalar(self, v: Dict[str, VALID_INPUT_TYPES]) -> 'OperationNode': """ Construct an scalar value, this can contain str, float, double, integers and booleans. diff --git a/src/main/python/systemds/frame/__init__.py b/src/main/python/systemds/frame/__init__.py new file mode 100644 index 0000000..47b9b6c --- /dev/null +++ b/src/main/python/systemds/frame/__init__.py @@ -0,0 +1,24 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +from systemds.frame.frame import Frame + +__all__ = [Frame] diff --git a/src/main/python/systemds/frame/frame.py b/src/main/python/systemds/frame/frame.py new file mode 100644 index 0000000..5cc4554 --- /dev/null +++ b/src/main/python/systemds/frame/frame.py @@ -0,0 +1,128 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import os +from typing import Dict, Optional, Sequence, Tuple, Union + +import numpy as np +import pandas as pd +from py4j.java_gateway import JavaObject, JVMView +from systemds.operator import OperationNode +from systemds.utils.consts import VALID_INPUT_TYPES +from systemds.utils.converters import pandas_to_frame_block +from systemds.script_building.dag import OutputType, DAGNode + + +class Frame(OperationNode): + def __init__( + self, + sds_context: "SystemDSContext", + df: pd.DataFrame, + *args: Sequence[VALID_INPUT_TYPES], + **kwargs: Dict[str, VALID_INPUT_TYPES] + ) -> None: + + if args: + unnamed_params = args + else: + unnamed_params = ["'./tmp/{file_name}'"] + unnamed_params.extend(args) + + named_params = {"data_type": '"frame"'} + + self._pd_dataframe = df + self.shape = None + if isinstance(self._pd_dataframe, pd.DataFrame): + self.shape = self._pd_dataframe.shape + named_params["rows"] = self.shape[0] + named_params["cols"] = self.shape[1] + + named_params.update(kwargs) + super().__init__( + sds_context, + "read", + unnamed_params, + named_params, + output_type=OutputType.FRAME, + is_python_local_data=self._is_pandas(), + shape=self.shape, + ) + + def pass_python_data_to_prepared_script( + self, sds, var_name: str, prepared_script: JavaObject + ) -> None: + assert ( + self.is_python_local_data + ), "Can only pass data to prepared script if it is python local!" + if self._is_pandas(): + prepared_script.setFrame( + var_name, pandas_to_frame_block(sds, self._pd_dataframe), True + ) # True for reuse + + def code_line( + self, + var_name: str, + unnamed_input_vars: Sequence[str], + named_input_vars: Dict[str, str], + ) -> str: + code_line = super().code_line(var_name, unnamed_input_vars, named_input_vars) + if self._is_pandas(): + code_line = code_line.format(file_name=var_name) + return code_line + + def compute( + self, verbose: bool = False, lineage: bool = False + ) -> Union[pd.DataFrame]: + if self._is_pandas(): + if verbose: + print("[Pandas Frame - No Compilation necessary]") + return self._pd_dataframe + else: + return super().compute(verbose, lineage) + + def _is_pandas(self) -> bool: + return self._pd_dataframe is not None + + def transform_encode(self, spec): + self._check_frame_op() + self._check_other(spec, OutputType.SCALAR) + params_dict = {"target": self, "spec": spec} + return OperationNode( + self.sds_context, + "transformencode", + named_input_nodes=params_dict, + output_type=OutputType.LIST, + number_of_outputs=2, + output_types=[OutputType.MATRIX, OutputType.FRAME], + ) + + def transform_apply(self, spec: "OperationNode", meta: "OperationNode"): + self._check_frame_op() + self._check_other(spec, OutputType.SCALAR) + self._check_other(meta, OutputType.FRAME) + params_dict = {"target": self, "spec": spec, "meta": meta} + return OperationNode( + self.sds_context, + "transformapply", + named_input_nodes=params_dict, + output_type=OutputType.MATRIX, + number_of_outputs=1, + ) diff --git a/src/main/python/systemds/operator/algorithm/__init__.py b/src/main/python/systemds/operator/algorithm/__init__.py index b3bcd3d..25d4602 100644 --- a/src/main/python/systemds/operator/algorithm/__init__.py +++ b/src/main/python/systemds/operator/algorithm/__init__.py @@ -41,7 +41,6 @@ from .builtin.dbscan import dbscan from .builtin.decisionTree import decisionTree from .builtin.discoverFD import discoverFD from .builtin.dist import dist -from .builtin.gaussianClassifier import gaussianClassifier from .builtin.getAccuracy import getAccuracy from .builtin.glm import glm from .builtin.gmm import gmm @@ -62,7 +61,6 @@ from .builtin.kmeans import kmeans from .builtin.kmeansPredict import kmeansPredict from .builtin.knnbf import knnbf from .builtin.l2svm import l2svm -from .builtin.l2svmPredict import l2svmPredict from .builtin.lasso import lasso from .builtin.lm import lm from .builtin.lmCG import lmCG @@ -95,7 +93,6 @@ from .builtin.splitBalanced import splitBalanced from .builtin.statsNA import statsNA from .builtin.steplm import steplm from .builtin.toOneHot import toOneHot -from .builtin.tomeklink import tomeklink from .builtin.univar import univar from .builtin.vectorToCsv import vectorToCsv from .builtin.winsorize import winsorize diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py index e2ac681..62f9f53 100644 --- a/src/main/python/systemds/operator/operation_node.py +++ b/src/main/python/systemds/operator/operation_node.py @@ -27,7 +27,7 @@ from py4j.java_gateway import JVMView, JavaObject from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES from systemds.utils.helpers import create_params_string -from systemds.utils.converters import matrix_block_to_numpy +from systemds.utils.converters import matrix_block_to_numpy, frame_block_to_pandas from systemds.script_building.script import DMLScript from systemds.script_building.dag import OutputType, DAGNode @@ -39,6 +39,7 @@ if TYPE_CHECKING: class OperationNode(DAGNode): """A Node representing an operation in SystemDS""" + shape: Optional[Tuple[int]] _result_var: Optional[Union[float, np.array]] _lineage_trace: Optional[str] @@ -99,7 +100,7 @@ class OperationNode(DAGNode): else: result_variables = self._script.execute() - self._result_var = self.__parse_output_result_variables(result_variables) + self._result_var = self.__parse_output_result_variables(result_variables) if verbose: for x in self.sds_context.get_stdout(): @@ -119,6 +120,10 @@ class OperationNode(DAGNode): return self.__parse_output_result_matrix(result_variables, self._script.out_var_name[0]) elif self.output_type == OutputType.LIST: return self.__parse_output_result_list(result_variables) + elif self.output_type == OutputType.FRAME: + return self.__parse_output_result_frame( + result_variables, self._script.out_var_name[0] + ) def __parse_output_result_double(self, result_variables, var_name): return result_variables.getDouble(var_name) @@ -127,11 +132,19 @@ class OperationNode(DAGNode): return matrix_block_to_numpy(self.sds_context.java_gateway.jvm, result_variables.getMatrixBlock(var_name)) + def __parse_output_result_frame(self, result_variables, var_name): + return frame_block_to_pandas( + self.sds_context, result_variables.getFrameBlock(var_name) + ) + def __parse_output_result_list(self, result_variables): result_var = [] for idx, v in enumerate(self._script.out_var_name): if(self._output_types == None or self._output_types[idx] == OutputType.MATRIX): result_var.append(self.__parse_output_result_matrix(result_variables,v)) + elif self._output_types[idx] == OutputType.FRAME: + result_var.append(self.__parse_output_result_frame(result_variables, v)) + else: result_var.append(result_variables.getDouble( self._script.out_var_name[idx])) @@ -184,7 +197,43 @@ class OperationNode(DAGNode): :raise: AssertionError """ assert self.output_type == OutputType.MATRIX, f'{self.operation} only supported for matrices' + + def _check_frame_op(self): + """Perform checks to assure operation is allowed to be performed on data type of this `OperationNode` + + :raise: AssertionError + """ + assert ( + self.output_type == OutputType.FRAME + ), f"{self.operation} only supported for frames" + + def _check_matrix_or_frame_op(self): + """Perform checks to assure operation is allowed to be performed on data type of this `OperationNode` + + :raise: AssertionError + """ + assert ( + self.output_type == OutputType.FRAME + or self.output_type == OutputType.MATRIX + ), f"{self.operation} only supported for frames or matrices" + + def _check_equal_op_type_as(self, other: "OperationNode"): + """Perform checks to assure operation is equal to 'other'. Used for rBind and cBind type equality check. + + :raise: AssertionError + """ + assert ( + self.output_type == other.output_type + ), f"{self.operation} only supported for Nodes of equal output-type. Got self: {self.output_type} and other: {other.output_type}" + + def _check_other(self, other: "OperationNode", expectedOutputType: OutputType): + """Perform check to assure other operation has expected output type. + + :raise: AssertionError + """ + assert other.output_type == expectedOutputType + def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode': return OperationNode(self.sds_context, '+', [self, other], shape=self.shape) @@ -500,32 +549,39 @@ class OperationNode(DAGNode): def rbind(self, other) -> 'OperationNode': """ - Row-wise matrix concatenation, by concatenating the second matrix as additional rows to the first matrix. - :param: The other matrix to bind to the right hand side - :return: The OperationNode containing the concatenated matrices. + Row-wise matrix/frame concatenation, by concatenating the second matrix/frame as additional rows to the first matrix/frame. + :param: The other matrix/frame to bind to the right hand side + :return: The OperationNode containing the concatenated matrices/frames. """ - - self._check_matrix_op() - other._check_matrix_op() - + self._check_equal_op_type_as(other) + self._check_matrix_or_frame_op() if self.shape[1] != other.shape[1]: raise ValueError( - "The input matrices to rbind does not have the same number of columns") - - return OperationNode(self.sds_context, 'rbind', [self, other], shape=(self.shape[0] + other.shape[0], self.shape[1])) + "The inputs to rbind do not have the same number of columns" + ) + return OperationNode( + self.sds_context, + "rbind", + [self, other], + shape=(self.shape[0] + other.shape[0], self.shape[1]), + output_type=self._output_type, + ) def cbind(self, other) -> 'OperationNode': """ - Column-wise matrix concatenation, by concatenating the second matrix as additional columns to the first matrix. - :param: The other matrix to bind to the right hand side. - :return: The OperationNode containing the concatenated matrices. + Column-wise matrix/frame concatenation, by concatenating the second matrix/frame as additional columns to the first matrix/frame. + :param: The other matrix/frame to bind to the right hand side. + :return: The OperationNode containing the concatenated matrices/frames. """ - - self._check_matrix_op() - other._check_matrix_op() - + self._check_equal_op_type_as(other) + self._check_matrix_or_frame_op() if self.shape[0] != other.shape[0]: - raise ValueError( - "The input matrices to cbind does not have the same number of columns") + raise ValueError("The inputs to cbind do not have the same number of rows") + return OperationNode( + self.sds_context, + "cbind", + [self, other], + shape=(self.shape[0], self.shape[1] + other.shape[1],), + output_type=self._output_type, + ) - return OperationNode(self.sds_context, 'cbind', [self, other], shape=(self.shape[0], self.shape[1] + other.shape[1])) diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 3079a55..61a5957 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -20,7 +20,8 @@ # ------------------------------------------------------------- import numpy as np -from py4j.java_gateway import JavaClass, JavaObject, JVMView +import pandas as pd +from py4j.java_gateway import JavaClass, JavaObject, JVMView, JavaGateway def numpy_to_matrix_block(sds: 'SystemDSContext', np_arr: np.array): """Converts a given numpy array, to internal matrix block representation. @@ -69,5 +70,73 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject): num_ros = mb.getNumRows() num_cols = mb.getNumColumns() buf = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convertMBtoPy4JDenseArr( - mb) - return np.frombuffer(buf, count=num_ros * num_cols, dtype=np.float64).reshape((num_ros, num_cols)) + mb + ) + return np.frombuffer(buf, count=num_ros * num_cols, dtype=np.float64).reshape( + (num_ros, num_cols) + ) + + +def pandas_to_frame_block(sds: "SystemDSContext", pd_df: pd.DataFrame): + """Converts a given numpy array, to internal matrix block representation. + + :param sds: The current systemds context. + :param np_arr: the numpy array to convert to matrixblock. + """ + assert pd_df.ndim <= 2, "pd_df invalid, because it has more than 2 dimensions" + rows = pd_df.shape[0] + cols = pd_df.shape[1] + + jvm: JVMView = sds.java_gateway.jvm + java_gate: JavaGateway = sds.java_gateway + + # pandas type mapping to systemds Valuetypes + data_type_mapping = { + np.dtype(np.object_): jvm.org.apache.sysds.common.Types.ValueType.STRING, + np.dtype(np.int64): jvm.org.apache.sysds.common.Types.ValueType.INT64, + np.dtype(np.float64): jvm.org.apache.sysds.common.Types.ValueType.FP64, + np.dtype(np.bool_): jvm.org.apache.sysds.common.Types.ValueType.BOOLEAN, + np.dtype("<M8[ns]"): jvm.org.apache.sysds.common.Types.ValueType.STRING, + } + schema = [] + col_names = [] + + for col_name, dtype in dict(pd_df.dtypes).items(): + col_names.append(col_name) + if dtype in data_type_mapping.keys(): + schema.append(data_type_mapping[dtype]) + else: + schema.append(jvm.org.apache.sysds.common.Types.ValueType.STRING) + try: + jc_ValueType = jvm.org.apache.sysds.common.Types.ValueType + jc_String = jvm.java.lang.String + jc_FrameBlock = jvm.org.apache.sysds.runtime.matrix.data.FrameBlock + j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema)) + j_colNameArray = java_gate.new_array(jc_String, len(col_names)) + j_dataArray = java_gate.new_array(jc_String, rows, cols) + for i in range(len(schema)): + j_valueTypeArray[i] = schema[i] + for i in range(len(col_names)): + j_colNameArray[i] = col_names[i] + j = 0 + for j, col_name in enumerate(col_names): + col_data = pd_df[col_name].fillna("").to_numpy(dtype=str) + for i in range(col_data.shape[0]): + if col_data[i]: + j_dataArray[i][j] = col_data[i] + fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray) + + return fb + except Exception as e: + sds.exception_and_close(e) + + +def frame_block_to_pandas(sds: "SystemDSContext", fb: JavaObject): + num_rows = fb.getNumRows() + num_cols = fb.getNumColumns() + df = pd.DataFrame() + for c_index in range(num_cols): + col_data = fb.getColumnData(c_index) + df[fb.getColumnName(c_index)] = np.array(col_data[:num_rows]) + + return df diff --git a/src/main/python/tests/frame/data/homes.csv b/src/main/python/tests/frame/data/homes.csv new file mode 100644 index 0000000..a8d6fab --- /dev/null +++ b/src/main/python/tests/frame/data/homes.csv @@ -0,0 +1,21 @@ +zipcode,district,sqft,numbedrooms,numbathrooms,floors,view,saleprice,askingprice +95141,west,1373,7,1,3,FALSE,695,698 +91312,south,3261,6,2,2,FALSE,902,906 +94555,north,1835,3,3,3,TRUE,888,892 +95141,east,2833,6,2.5,2,TRUE,927,932 +96334,south,2742,6,2.5,2,FALSE,872,876 +96334,north,2195,5,2.5,2,FALSE,799,803 +98755,north,3469,7,2.5,2,FALSE,958,963 +96334,west,1685,7,1.5,2,TRUE,757,760 +95141,west,2238,4,3,3,FALSE,894,899 +91312,west,1245,4,1,1,FALSE,547,549 +98755,south,3702,7,3,1,FALSE,959,964 +98755,north,1865,7,1,2,TRUE,742,745 +94555,north,3837,3,1,1,FALSE,839,842 +91312,west,2139,3,1,3,TRUE,820,824 +95141,north,3824,4,3,1,FALSE,954,958 +98755,east,2858,5,1.5,1,FALSE,759,762 +91312,south,1827,7,3,1,FALSE,735,738 +91312,south,3557,2,2.5,1,FALSE,888,892 +91312,south,2553,2,2.5,2,TRUE,884,889 +96334,west,1682,3,1.5,1,FALSE,625,628 \ No newline at end of file diff --git a/src/main/python/tests/frame/data/homes.tfspec_bin2.json b/src/main/python/tests/frame/data/homes.tfspec_bin2.json new file mode 100644 index 0000000..910593a --- /dev/null +++ b/src/main/python/tests/frame/data/homes.tfspec_bin2.json @@ -0,0 +1,6 @@ +{ + "recode": ["zipcode", "district", "view"], + "bin": [ + { "name": "saleprice", "method": "equi-width", "numbins": 3 }, { "name": "sqft", "method": "equi-width", "numbins": 4 } + ] +} \ No newline at end of file diff --git a/src/main/python/tests/frame/data/homes.tfspec_recode2.json b/src/main/python/tests/frame/data/homes.tfspec_recode2.json new file mode 100644 index 0000000..394dfc7 --- /dev/null +++ b/src/main/python/tests/frame/data/homes.tfspec_recode2.json @@ -0,0 +1,3 @@ +{ + "recode": ["zipcode", "district", "view"] +} \ No newline at end of file diff --git a/src/main/python/tests/frame/test_hyperband.py b/src/main/python/tests/frame/test_hyperband.py new file mode 100644 index 0000000..b470ed1 --- /dev/null +++ b/src/main/python/tests/frame/test_hyperband.py @@ -0,0 +1,86 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import os +import shutil +import sys +import unittest + +import pandas as pd +import numpy as np +from systemds.context import SystemDSContext +from systemds.frame import Frame +from systemds.matrix import Matrix +from systemds.operator.algorithm import hyperband + + + +class TestHyperband(unittest.TestCase): + + sds: SystemDSContext = None + np.random.seed(42) + X_train = np.random.rand(50, 10) + y_train = np.sum(X_train, axis=1, keepdims=True) + np.random.rand(50, 1) + X_val = np.random.rand(50, 10) + y_val = np.sum(X_val, axis=1, keepdims=True) + np.random.rand(50, 1) + params = 'list("reg", "tol", "maxi")' + min_max_params = [[0, 20],[0.0001, 0.1],[5, 10]] + param_ranges = np.array(min_max_params) + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def tearDown(self): + pass + + def test_hyperband(self): + x_train = Matrix(self.sds, self.X_train) + y_train = Matrix(self.sds, self.y_train) + x_val = Matrix(self.sds, self.X_val) + y_val = Matrix(self.sds, self.y_val) + paramRanges = Matrix(self.sds, self.param_ranges) + params = self.params + [best_weights_mat, opt_hyper_params_df] = hyperband( + X_train=x_train, + y_train=y_train, + X_val=x_val, + y_val=y_val, + params=params, + paramRanges=paramRanges, + ).compute() + self.assertTrue(isinstance(best_weights_mat, np.ndarray)) + self.assertTrue(best_weights_mat.shape[0] == self.X_train.shape[1]) + self.assertTrue(best_weights_mat.shape[1] == self.y_train.shape[1]) + + self.assertTrue(isinstance(opt_hyper_params_df, pd.DataFrame)) + self.assertTrue(opt_hyper_params_df.shape[0] == paramRanges.shape[0]) + self.assertTrue(opt_hyper_params_df.shape[1] == 1) + for i, hyper_param in enumerate(opt_hyper_params_df.values.flatten().tolist()): + self.assertTrue(self.min_max_params[i][0] <= hyper_param <= self.min_max_params[i][1]) + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/main/python/tests/frame/test_r_c_bind.py b/src/main/python/tests/frame/test_r_c_bind.py new file mode 100644 index 0000000..c16b4bf --- /dev/null +++ b/src/main/python/tests/frame/test_r_c_bind.py @@ -0,0 +1,130 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest + +import pandas as pd +from systemds.context import SystemDSContext +from systemds.frame import Frame + + +class TestRCBind(unittest.TestCase): + + sds: SystemDSContext = None + + # shape (2, 3) + df_cb_1 = pd.DataFrame( + {"col1": ["col1_hello", "col1_world"], "col2": [0, 1], "col3": [0.0, 0.1]} + ) + # shape (2, 2) + df_cb_2 = pd.DataFrame({"col4": ["col4_hello", "col4_world"], "col5": [0, 1]}) + df_cb_3 = pd.DataFrame({"col6": ["col6_hello", "col6_world"], "col7": [0, 1]}) + + + #shape (2, 3) + df_rb_1 = pd.DataFrame( + {"col1": ["col1_hello_1", "col1_world_1"], "col2": [0, 1], "col3": [0.0, 0.1]} + ) + #shape (4, 3) + df_rb_2 = pd.DataFrame( + { + "col1": ["col1_hello_2", "col1_world_2", "col1_hello_2", "col1_world_2"], + "col2": [2, 3, 4, 5], + "col3": [0.2, 0.3, 0.4, 0.5], + } + ) + #shape (3, 3) + df_rb_3 = pd.DataFrame( + { + "col1": ["col1_hello_3", "col1_world_3", "col1_hello_3"], + "col2": [6, 7, 8], + "col3": [0.6, 0.7, 0.8], + } + ) + + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def test_r_bind_pair(self): + f1 = Frame(self.sds, self.df_rb_1) + f2 = Frame(self.sds, self.df_rb_2) + result_df = f1.rbind(f2).compute() + self.assertTrue(isinstance(result_df, pd.DataFrame)) + target_df = pd.concat([self.df_rb_1, self.df_rb_2], ignore_index=True) + self.assertTrue(target_df.equals(result_df)) + + def test_r_bind_triple(self): + f1 = Frame(self.sds, self.df_rb_1) + f2 = Frame(self.sds, self.df_rb_2) + f3 = Frame(self.sds, self.df_rb_3) + result_df = f1.rbind(f2).rbind(f3).compute() + self.assertTrue(isinstance(result_df, pd.DataFrame)) + target_df = pd.concat([self.df_rb_1, self.df_rb_2, self.df_rb_3], ignore_index=True) + self.assertTrue(target_df.equals(result_df)) + + def test_r_bind_triple_twostep(self): + f1 = Frame(self.sds, self.df_rb_1) + f2 = Frame(self.sds, self.df_rb_2) + f3 = Frame(self.sds, self.df_rb_3) + tmp_df = f1.rbind(f2).compute() + result_df = Frame(self.sds, tmp_df).rbind(f3).compute() + self.assertTrue(isinstance(result_df, pd.DataFrame)) + target_df = pd.concat([self.df_rb_1, self.df_rb_2, self.df_rb_3], ignore_index=True) + self.assertTrue(target_df.equals(result_df)) + + def test_c_bind_pair(self): + f1 = Frame(self.sds, self.df_cb_1) + f2 = Frame(self.sds, self.df_cb_2) + result_df = f1.cbind(f2).compute() + self.assertTrue(isinstance(result_df, pd.DataFrame)) + target_df= pd.concat([self.df_cb_1, self.df_cb_2], axis=1) + self.assertTrue(target_df.equals(result_df)) + + def test_c_bind_triple(self): + f1 = Frame(self.sds, self.df_cb_1) + f2 = Frame(self.sds, self.df_cb_2) + f3 = Frame(self.sds, self.df_cb_3) + result_df = f1.cbind(f2).cbind(f3).compute() + self.assertTrue(isinstance(result_df, pd.DataFrame)) + target_df = pd.concat([self.df_cb_1, self.df_cb_2, self.df_cb_3], axis=1) + self.assertTrue(target_df.equals(result_df)) + + def test_c_bind_triple_twostep(self): + f1 = Frame(self.sds, self.df_cb_1) + f2 = Frame(self.sds, self.df_cb_2) + f3 = Frame(self.sds, self.df_cb_3) + tmp_df = f1.cbind(f2).compute() + result_df = Frame(self.sds, tmp_df).cbind(f3).compute() + self.assertTrue(isinstance(result_df, pd.DataFrame)) + target_df = pd.concat([self.df_cb_1, self.df_cb_2, self.df_cb_3], axis=1) + self.assertTrue(target_df.equals(result_df)) + + + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/main/python/tests/frame/test_transform_apply.py b/src/main/python/tests/frame/test_transform_apply.py new file mode 100644 index 0000000..8b41efa --- /dev/null +++ b/src/main/python/tests/frame/test_transform_apply.py @@ -0,0 +1,86 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import os +import shutil +import sys +import unittest + +import pandas as pd +import numpy as np +import json +from systemds.context import SystemDSContext +from systemds.frame import Frame +from systemds.matrix import Matrix + + +class TestTransformApply(unittest.TestCase): + + sds: SystemDSContext = None + HOMES_PATH = "tests/frame/data/homes.csv" + HOMES_SCHEMA = '"int,string,int,int,double,int,boolean,int,int"' + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def tearDown(self): + pass + + def test_apply_recode_bin(self): + JSPEC_PATH = "tests/frame/data/homes.tfspec_bin2.json" + with open(JSPEC_PATH) as jspec_file: + JSPEC = json.load(jspec_file) + F1 = self.sds.read( + self.HOMES_PATH, + data_type="frame", + schema=self.HOMES_SCHEMA, + format="csv", + header=True, + ) + pd_F1 = F1.compute() + jspec = self.sds.read(JSPEC_PATH, data_type="scalar", value_type="string") + X, M = F1.transform_encode(spec=jspec).compute() + self.assertTrue(isinstance(X, np.ndarray)) + self.assertTrue(isinstance(M, pd.DataFrame)) + self.assertTrue(X.shape == pd_F1.shape) + self.assertTrue(np.all(np.isreal(X))) + relevant_columns = set() + for col_name in JSPEC["recode"]: + relevant_columns.add(pd_F1.columns.get_loc(col_name)) + self.assertTrue(M[col_name].nunique() == pd_F1[col_name].nunique()) + for binning in JSPEC["bin"]: + col_name = binning["name"] + relevant_columns.add(pd_F1.columns.get_loc(col_name)) + self.assertTrue(M[col_name].nunique() == binning["numbins"]) + + X2 = F1.transform_apply(spec=jspec, meta=Frame(self.sds, M)).compute() + self.assertTrue(X.shape == X2.shape) + self.assertTrue(np.all(np.isreal(X2))) + + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/main/python/tests/frame/test_transform_encode.py b/src/main/python/tests/frame/test_transform_encode.py new file mode 100644 index 0000000..4e88190 --- /dev/null +++ b/src/main/python/tests/frame/test_transform_encode.py @@ -0,0 +1,75 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import os +import shutil +import sys +import unittest + +import pandas as pd +import numpy as np +import json +from systemds.context import SystemDSContext +from systemds.frame import Frame +from systemds.matrix import Matrix + + +class TestTransformEncode(unittest.TestCase): + + sds: SystemDSContext = None + HOMES_PATH = "tests/frame/data/homes.csv" + HOMES_SCHEMA = '"int,string,int,int,double,int,boolean,int,int"' + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def tearDown(self): + pass + + def test_encode_recode(self): + JSPEC_PATH = "tests/frame/data/homes.tfspec_recode2.json" + with open(JSPEC_PATH) as jspec_file: + JSPEC = json.load(jspec_file) + F1 = self.sds.read( + self.HOMES_PATH, + data_type="frame", + schema=self.HOMES_SCHEMA, + format="csv", + header=True, + ) + pd_F1 = F1.compute() + jspec = self.sds.read(JSPEC_PATH, data_type="scalar", value_type="string") + X, M = F1.transform_encode(spec=jspec).compute() + self.assertTrue(isinstance(X, np.ndarray)) + self.assertTrue(isinstance(M, pd.DataFrame)) + self.assertTrue(X.shape == pd_F1.shape) + self.assertTrue(np.all(np.isreal(X))) + for col_name in JSPEC["recode"]: + self.assertTrue(M[col_name].nunique() == pd_F1[col_name].nunique()) + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/main/python/tests/frame/test_write_read.py b/src/main/python/tests/frame/test_write_read.py new file mode 100644 index 0000000..712efb9 --- /dev/null +++ b/src/main/python/tests/frame/test_write_read.py @@ -0,0 +1,74 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import os +import shutil +import sys +import unittest + +import pandas as pd +from systemds.context import SystemDSContext +from systemds.frame import Frame + + +class TestWriteRead(unittest.TestCase): + + sds: SystemDSContext = None + temp_dir: str = "tests/frame/temp_write/" + n_cols = 3 + n_rows = 100 + df = pd.DataFrame( + { + "col1": [f"col1_string_{i}" for i in range(n_rows)], + "col2": [i for i in range(n_rows)], + "col3": [i * 0.1 for i in range(n_rows)], + } + ) + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_write_read_binary(self): + frame = Frame(self.sds, self.df) + frame.write(self.temp_dir + "01").compute() + NX = self.sds.read(self.temp_dir + "01", data_type="frame") + result_df = NX.compute() + self.assertTrue((self.df.values == result_df.values).all()) + + def test_write_read_csv(self): + frame = Frame(self.sds, self.df) + frame.write(self.temp_dir + "02", header=True, format="csv").compute() + NX = self.sds.read(self.temp_dir + "02", data_type="frame", format="csv") + result_df = NX.compute() + self.assertTrue(isinstance(result_df, pd.DataFrame)) + self.assertTrue(self.df.equals(result_df)) + + +if __name__ == "__main__": + unittest.main(exit=False)
