This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit fad79545a670e7f81848a763ae3bebc5ba8587f8 Author: baunsgaard <[email protected]> AuthorDate: Tue Sep 14 21:13:53 2021 +0200 [SYSTEMDS-3133] Right indexing on Frames and Matrix This commit adds support for right indexing of frames and matrices in the Python API. --- .../sysds/runtime/matrix/data/FrameBlock.java | 46 +++++++++- src/main/python/systemds/operator/nodes/frame.py | 11 ++- src/main/python/systemds/operator/nodes/matrix.py | 9 +- .../python/systemds/operator/operation_node.py | 11 ++- src/main/python/systemds/utils/converters.py | 12 ++- src/main/python/systemds/utils/helpers.py | 22 +++++ src/main/python/tests/frame/test_rIndexing.py | 100 +++++++++++++++++++++ src/main/python/tests/matrix/test_rIndexing.py | 62 +++++++++++-- 8 files changed, 256 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java index 2e39cef..9ff106c 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java @@ -27,6 +27,8 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -577,8 +579,9 @@ public class FrameBlock implements CacheBlock, Externalizable { switch(_schema[c]) { case STRING: return ((StringArray)_coldata[c])._data; case BOOLEAN: return ((BooleanArray)_coldata[c])._data; - case INT64: return ((LongArray)_coldata[c])._data; - case FP64: return ((DoubleArray)_coldata[c])._data; + case INT64: return ((LongArray)_coldata[c])._data; + case INT32: return ((IntegerArray)_coldata[c])._data; + case FP64: return ((DoubleArray)_coldata[c])._data; default: return null; } } @@ -588,6 +591,7 @@ public class FrameBlock implements CacheBlock, Externalizable { case STRING: return "String"; case BOOLEAN: return "Boolean"; case INT64: return "Long"; + case INT32: return "Int"; case FP64: return "Double"; default: return null; } @@ -617,6 +621,27 @@ public class FrameBlock implements CacheBlock, Externalizable { } } + public byte[] getColumnAsBytes(int c){ + switch(_schema[c]){ + case INT64: + long[] colLong = ((LongArray)_coldata[c])._data; + ByteBuffer longBuffer = ByteBuffer.allocate(8 * getNumRows()); + longBuffer.order(ByteOrder.LITTLE_ENDIAN); + for(int i = 0; i < getNumRows(); i++) + longBuffer.putLong(colLong[i]); + return longBuffer.array(); + case INT32: + int[] colInt = ((IntegerArray)_coldata[c])._data; + ByteBuffer intBuffer = ByteBuffer.allocate(4 * getNumRows()); + intBuffer.order(ByteOrder.LITTLE_ENDIAN); + for(int i = 0; i < getNumRows(); i++) + intBuffer.putInt(colInt[i]); + return intBuffer.array(); + default: + throw new NotImplementedException(); + } + } + public Array getColumn(int c) { return _coldata[c]; } @@ -1547,6 +1572,11 @@ public class FrameBlock implements CacheBlock, Externalizable { public abstract Array clone(); public abstract Array slice(int rl, int ru); public abstract void reset(int size); + + @Override + public String toString(){ + return this.getClass().getSimpleName().toString() + ":" + _size; + } } private static class StringArray extends Array<String> { @@ -2291,4 +2321,16 @@ public class FrameBlock implements CacheBlock, Externalizable { } return ret; } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("FrameBlock"); + sb.append("\n"); + sb.append(Arrays.toString(_schema)); + sb.append("\n"); + sb.append(Arrays.toString(_coldata)); + + return sb.toString(); + } } diff --git a/src/main/python/systemds/operator/nodes/frame.py b/src/main/python/systemds/operator/nodes/frame.py index 6551332..5fe02cc 100644 --- a/src/main/python/systemds/operator/nodes/frame.py +++ b/src/main/python/systemds/operator/nodes/frame.py @@ -29,6 +29,7 @@ import pandas as pd from py4j.java_gateway import JavaObject, JVMView from systemds.operator import OperationNode, Matrix, MultiReturn from systemds.utils.consts import VALID_INPUT_TYPES +from systemds.utils.helpers import get_slice_string from systemds.utils.converters import pandas_to_frame_block, frame_block_to_pandas from systemds.script_building.dag import OutputType, DAGNode @@ -45,7 +46,7 @@ class Frame(OperationNode): unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None, named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, - local_data: pd.DataFrame = None) -> "Frame": + local_data: pd.DataFrame = None, brackets:bool = False) -> "Frame": is_python_local_data = False if local_data is not None: self._pd_dataframe = local_data @@ -54,7 +55,7 @@ class Frame(OperationNode): self._pd_dataframe = None super().__init__(sds_context, operation, unnamed_input_nodes, - named_input_nodes, OutputType.FRAME, is_python_local_data) + named_input_nodes, OutputType.FRAME, is_python_local_data, brackets) def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None: assert ( @@ -134,4 +135,8 @@ class Frame(OperationNode): return Frame(self.sds_context, "replace", named_input_nodes={"target": self, "pattern": f"'{pattern}'", "replacement":f"'{replacement}'"}) def __str__(self): - return "FrameNode" \ No newline at end of file + return "FrameNode" + + def __getitem__(self, i) -> 'Frame': + sliceIns = get_slice_string(i) + return Frame(self.sds_context, '', [self, sliceIns], brackets=True) \ No newline at end of file diff --git a/src/main/python/systemds/operator/nodes/matrix.py b/src/main/python/systemds/operator/nodes/matrix.py index f90218b..2603dd5 100644 --- a/src/main/python/systemds/operator/nodes/matrix.py +++ b/src/main/python/systemds/operator/nodes/matrix.py @@ -29,6 +29,7 @@ from py4j.java_gateway import JavaObject, JVMView from systemds.operator import OperationNode, Scalar from systemds.utils.consts import VALID_INPUT_TYPES from systemds.utils.converters import numpy_to_matrix_block, matrix_block_to_numpy +from systemds.utils.helpers import get_slice_string from systemds.script_building.dag import OutputType from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES @@ -41,7 +42,7 @@ class Matrix(OperationNode): unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None, named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, - local_data: np.array = None) -> 'Matrix': + local_data: np.array = None, brackets:bool = False ) -> 'Matrix': is_python_local_data = False if local_data is not None: @@ -51,7 +52,7 @@ class Matrix(OperationNode): self._np_array = None super().__init__(sds_context, operation, unnamed_input_nodes, - named_input_nodes, OutputType.MATRIX, is_python_local_data) + named_input_nodes, OutputType.MATRIX, is_python_local_data, brackets) def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None: assert self.is_python_local_data, 'Can only pass data to prepared script if it is python local!' @@ -152,6 +153,10 @@ class Matrix(OperationNode): def __matmul__(self, other: 'Matrix') -> 'Matrix': return Matrix(self.sds_context, '%*%', [self, other]) + def __getitem__(self, i): + sliceIns = get_slice_string(i) + return Matrix(self.sds_context, '', [self, sliceIns], brackets=True) + def sum(self, axis: int = None) -> 'OperationNode': """Calculate sum of matrix. diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py index 8dba0f9..acc3988 100644 --- a/src/main/python/systemds/operator/operation_node.py +++ b/src/main/python/systemds/operator/operation_node.py @@ -44,13 +44,15 @@ class OperationNode(DAGNode): _script: Optional[DMLScript] _output_types: Optional[Iterable[VALID_INPUT_TYPES]] _source_node: Optional["DAGNode"] + _brackets: bool def __init__(self, sds_context: 'SystemDSContext', operation: str, unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None, named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, output_type: OutputType = OutputType.MATRIX, - is_python_local_data: bool = False): + is_python_local_data: bool = False, + brackets: bool = False): """ Create general `OperationNode` @@ -80,6 +82,7 @@ class OperationNode(DAGNode): self._script = None self._source_node = None self._already_added = False + self._brackets = brackets self.dml_name = "" def compute(self, verbose: bool = False, lineage: bool = False) -> \ @@ -134,13 +137,17 @@ class OperationNode(DAGNode): def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_input_vars: Dict[str, str]) -> str: + + if self._brackets: + return f'{var_name}={unnamed_input_vars[0]}[{",".join(unnamed_input_vars[1:])}]' + if self.operation in BINARY_OPERATIONS: assert len( named_input_vars) == 0, 'Named parameters can not be used with binary operations' assert len( unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables' return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}' - + inputs_comma_sep = create_params_string( unnamed_input_vars, named_input_vars) diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 5ef41d7..342c7a3 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -118,7 +118,7 @@ def pandas_to_frame_block(sds: "SystemDSContext", pd_df: pd.DataFrame): for i in range(len(schema)): j_valueTypeArray[i] = schema[i] for i in range(len(col_names)): - j_colNameArray[i] = col_names[i] + j_colNameArray[i] = str(col_names[i]) j = 0 for j, col_name in enumerate(col_names): col_data = pd_df[col_name].fillna("").to_numpy(dtype=str) @@ -150,9 +150,15 @@ def frame_block_to_pandas(sds: "SystemDSContext", fb: JavaObject): ret.append(ent) else: ret.append(None) - df[fb.getColumnName(c_index)] = ret + elif d_type == "Int": + byteArray = fb.getColumnAsBytes(c_index) + ret = np.frombuffer(byteArray, dtype=np.int32) + elif d_type == "Long": + byteArray = fb.getColumnAsBytes(c_index) + ret = np.frombuffer(byteArray, dtype=np.int64) else: - raise NotImplementedError("Not Implemented other types for systemds to pandas parsing") + raise NotImplementedError(f'Not Implemented {d_type} for systemds to pandas parsing') + df[fb.getColumnName(c_index)] = ret return df diff --git a/src/main/python/systemds/utils/helpers.py b/src/main/python/systemds/utils/helpers.py index 7aae17e..67b4f8d 100644 --- a/src/main/python/systemds/utils/helpers.py +++ b/src/main/python/systemds/utils/helpers.py @@ -48,3 +48,25 @@ def get_module_dir() -> os.PathLike: """ spec = find_spec(MODULE_NAME) return spec.submodule_search_locations[0] + + +def get_slice_string(i): + if isinstance(i, tuple): + if len(i) > 2: + raise ValueError(f'Invalid number of dimensions to slice {len(i)}, Only 2 dimensions allowed') + else: + return f'{get_slice_string(i[0])},{get_slice_string(i[1])}' + elif isinstance(i, slice): + if i.step: + raise ValueError("Invalid to slice with step in systemds") + elif i.start == None and i.stop == None: + return '' + elif i.start == None or i.stop == None: + raise NotImplementedError("Not Implemented slice with dynamic end") + else: + # + 1 since R and systemDS is 1 indexed. + return f'{i.start+1}:{i.stop}' + else: + # + 1 since R and systemDS is 1 indexed. + sliceIns = i+1 + return sliceIns \ No newline at end of file diff --git a/src/main/python/tests/frame/test_rIndexing.py b/src/main/python/tests/frame/test_rIndexing.py new file mode 100644 index 0000000..cb69f03 --- /dev/null +++ b/src/main/python/tests/frame/test_rIndexing.py @@ -0,0 +1,100 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest + +import numpy as np +import pandas as pd +from systemds.context import SystemDSContext + + +class Test_rIndexing(unittest.TestCase): + + sds: SystemDSContext = None + + #shape (4, 3) + df = pd.DataFrame(np.arange(0, 100).reshape(10, 10)) + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext() + + @classmethod + def tearDownClass(cls): + cls.sds.close() + + def test_1(self): + m1 = self.sds.from_pandas(self.df) + npres = self.df.loc[4] + res = m1[4].compute() + self.assertTrue(np.allclose(res, npres)) + + def test_2(self): + m1 = self.sds.from_pandas(self.df) + # Pandas is not consistant with numpy, since it is inclusive ranges + # therefore the tests are subtracting 1 from the end of the range. + npres = self.df.loc[4:4] + res = m1[4:5].compute() + self.assertTrue(np.allclose(res, npres)) + + def test_3(self): + m1 = self.sds.from_pandas(self.df) + # Invalid to slice with a step + with self.assertRaises(ValueError) as context: + res = m1[4:7:2].compute() + + def test_4(self): + m1 = self.sds.from_pandas(self.df) + npres = np.array(self.df.loc[:,4]) + res = np.array(m1[:,4].compute()).flatten() + self.assertTrue(np.allclose(res, npres)) + + def test_5(self): + m1 = self.sds.from_pandas(self.df) + npres = np.array(self.df.loc[:,4:5]) + res = np.array(m1[:,4:6].compute()) + self.assertTrue(np.allclose(res, npres)) + + def test_6(self): + m1 = self.sds.from_pandas(self.df) + npres = self.df.loc[1:1,4:5] + res = m1[1:2,4:6].compute() + self.assertTrue(np.allclose(res, npres)) + + def test_7(self): + m1 = self.sds.from_pandas(self.df) + npres = self.df.loc[1,4:5] + res = m1[1,4:6].compute() + self.assertTrue(np.allclose(res, npres)) + + def test_8(self): + m1 = self.sds.from_pandas(self.df) + with self.assertRaises(NotImplementedError) as context: + res = m1[1:,4:6].compute() + + def test_9(self): + m1 = self.sds.from_pandas(self.df) + with self.assertRaises(NotImplementedError) as context: + res = m1[:3,4:6].compute() + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/main/python/tests/matrix/test_rIndexing.py b/src/main/python/tests/matrix/test_rIndexing.py index 1d25dab..01401ad 100644 --- a/src/main/python/tests/matrix/test_rIndexing.py +++ b/src/main/python/tests/matrix/test_rIndexing.py @@ -38,14 +38,66 @@ class Test_rIndexing(unittest.TestCase): cls.sds.close() def test_1(self): - npA = np.zeros((10, 2)) + npA = np.arange(0, 100).reshape(10, 10) m1 = self.sds.from_numpy(npA) - npres = npA[4,] - print(npres) - res = m1[4,].compute() - print(res) + npres = npA[4] + res = m1[4].compute() self.assertTrue(np.allclose(res, npres)) + def test_2(self): + npA = np.arange(0, 100).reshape(10, 10) + m1 = self.sds.from_numpy(npA) + npres = npA[4:5] + res = m1[4:5].compute() + self.assertTrue(np.allclose(res, npres)) + + def test_3(self): + npA = np.arange(0, 100).reshape(10, 10) + m1 = self.sds.from_numpy(npA) + # Invalid to slice with a step + with self.assertRaises(ValueError) as context: + res = m1[4:7:2].compute() + + def test_4(self): + npA = np.arange(0, 100).reshape(10, 10) + m1 = self.sds.from_numpy(npA) + npres = npA[:,4] + res = m1[:,4].compute().flatten() + self.assertTrue(np.allclose(res, npres)) + + def test_5(self): + npA = np.arange(0, 100).reshape(10, 10) + m1 = self.sds.from_numpy(npA) + npres = npA[:,4:6] + res = m1[:,4:6].compute() + self.assertTrue(np.allclose(res, npres)) + + def test_6(self): + npA = np.arange(0, 100).reshape(10, 10) + m1 = self.sds.from_numpy(npA) + npres = npA[1:2,4:6] + res = m1[1:2,4:6].compute() + self.assertTrue(np.allclose(res, npres)) + + def test_7(self): + npA = np.arange(0, 100).reshape(10, 10) + m1 = self.sds.from_numpy(npA) + npres = npA[1,4:6] + res = m1[1,4:6].compute() + self.assertTrue(np.allclose(res, npres)) + + def test_8(self): + npA = np.arange(0, 100).reshape(10, 10) + m1 = self.sds.from_numpy(npA) + with self.assertRaises(NotImplementedError) as context: + res = m1[1:,4:6].compute() + + def test_9(self): + npA = np.arange(0, 100).reshape(10, 10) + m1 = self.sds.from_numpy(npA) + with self.assertRaises(NotImplementedError) as context: + res = m1[:3,4:6].compute() + if __name__ == "__main__": unittest.main(exit=False)
