This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit efb9cf0d2f4167d6d7c7ac87be547457bb98f69c
Author: ally <[email protected]>
AuthorDate: Sun Jun 16 12:48:18 2024 +0200

    [SYSTEMDS-3548] Optimize IO Path via column vectorization
    
    This commit optimize the IO path from Python via column
    vectorization and transfer. Making the overall conversion
    from python Pandas into SystemDS FrameBlock faster.
    
    LDE project SoSe'24
    
    Closes #2032
---
 .../sysds/runtime/util/Py4jConverterUtils.java     |  78 ++++++++++++++
 src/main/python/systemds/utils/converters.py       |  79 ++++++++++----
 .../tests/iotests/test_io_pandas_systemds.py       |  86 +++++++++++++++
 src/main/python/tests/matrix/test_print.py         |   2 +-
 .../frame/array/Py4jConverterUtilsTest.java        | 115 +++++++++++++++++++++
 5 files changed, 338 insertions(+), 22 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java 
b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
index 3cd645cf6e..abc9abb4fd 100644
--- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
@@ -21,9 +21,13 @@ package org.apache.sysds.runtime.util;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
+import org.apache.sysds.runtime.frame.data.columns.BooleanArray;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
@@ -114,6 +118,80 @@ public class Py4jConverterUtils {
                return mb;
        }
 
+       public static Array<?> convert(byte[] data, int numElements, 
Types.ValueType valueType) {
+               if(data == null || valueType == null) {
+                       throw new DMLRuntimeException("Invalid input data or 
value type.");
+               }
+
+               ByteBuffer buffer = ByteBuffer.wrap(data);
+               buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+               Array<?> array = ArrayFactory.allocate(valueType, numElements);
+
+               // Process the data based on the value type
+               switch(valueType) {
+                       case UINT4:
+                               for(int i = 0; i < numElements; i++) {
+                                       array.set(i, (int) (buffer.get() & 
0xFF));
+                               }
+                               break;
+                       case UINT8:
+                               for(int i = 0; i < numElements; i++) {
+                                       array.set(i, (int) (buffer.get() & 
0xFF));
+                               }
+                               break;
+                       case INT32:
+                               for(int i = 0; i < numElements; i++) {
+                                       array.set(i, buffer.getInt());
+                               }
+                               break;
+                       case INT64:
+                               for(int i = 0; i < numElements; i++) {
+                                       array.set(i, buffer.getLong());
+                               }
+                               break;
+                       case FP32:
+                               for(int i = 0; i < numElements; i++) {
+                                       array.set(i, buffer.getFloat());
+                               }
+                               break;
+                       case FP64:
+                               for(int i = 0; i < numElements; i++) {
+                                       array.set(i, buffer.getDouble());
+                               }
+                               break;
+                       case BOOLEAN:
+                               for(int i = 0; i < numElements; i++) {
+                                       ((BooleanArray) array).set(i, 
buffer.get() != 0);
+                               }
+                               break;
+                       case STRING:
+                               for(int i = 0; i < numElements; i++) {
+                                       buffer.order(ByteOrder.BIG_ENDIAN);
+                                       int strLen = buffer.getInt();
+                                       buffer.order(ByteOrder.LITTLE_ENDIAN);
+                                       byte[] strBytes = new byte[strLen];
+                                       buffer.get(strBytes);
+                                       array.set(i, new String(strBytes, 
StandardCharsets.UTF_8));
+                               }
+                               break;
+                       case CHARACTER:
+                               for(int i = 0; i < numElements; i++) {
+                                       array.set(i, (char) buffer.get());
+                               }
+                               break;
+                       case HASH64:
+                               for(int i = 0; i < numElements; i++) {
+                                       array.set(i, buffer.getLong());
+                               }
+                               break;
+                       default:
+                               throw new DMLRuntimeException("Unsupported 
value type: " + valueType);
+               }
+
+               return array;
+       }
+
        public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
                byte[] ret = null;
                if(mb.isInSparseFormat()) {
diff --git a/src/main/python/systemds/utils/converters.py 
b/src/main/python/systemds/utils/converters.py
index 136e3470ca..5ce3fbde57 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -19,6 +19,7 @@
 #
 # -------------------------------------------------------------
 
+import struct
 
 import numpy as np
 import pandas as pd
@@ -31,7 +32,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
     :param sds: The current systemds context.
     :param np_arr: the numpy array to convert to matrixblock.
     """
-    assert (np_arr.ndim <= 2), "np_arr invalid, because it has more than 2 
dimensions"
+    assert np_arr.ndim <= 2, "np_arr invalid, because it has more than 2 
dimensions"
     rows = np_arr.shape[0]
     cols = np_arr.shape[1] if np_arr.ndim == 2 else 1
 
@@ -81,10 +82,10 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject):
 
 
 def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
-    """Converts a given numpy array, to internal matrix block representation.
+    """Converts a given pandas DataFrame to an internal FrameBlock 
representation.
 
-    :param sds: The current systemds context.
-    :param np_arr: the numpy array to convert to matrixblock.
+    :param sds: The current SystemDS context.
+    :param pd_df: The pandas DataFrame to convert to FrameBlock.
     """
     assert pd_df.ndim <= 2, "pd_df invalid, because it has more than 2 
dimensions"
     rows = pd_df.shape[0]
@@ -100,6 +101,10 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
         np.dtype(np.float64): jvm.org.apache.sysds.common.Types.ValueType.FP64,
         np.dtype(np.bool_): 
jvm.org.apache.sysds.common.Types.ValueType.BOOLEAN,
         np.dtype("<M8[ns]"): 
jvm.org.apache.sysds.common.Types.ValueType.STRING,
+        np.dtype(np.int32): jvm.org.apache.sysds.common.Types.ValueType.INT32,
+        np.dtype(np.float32): jvm.org.apache.sysds.common.Types.ValueType.FP32,
+        np.dtype(np.uint8): jvm.org.apache.sysds.common.Types.ValueType.UINT8,
+        np.dtype(np.character): 
jvm.org.apache.sysds.common.Types.ValueType.CHARACTER,
     }
     schema = []
     col_names = []
@@ -116,20 +121,51 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
         jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock
         j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema))
         j_colNameArray = java_gate.new_array(jc_String, len(col_names))
-        j_dataArray = java_gate.new_array(jc_String, rows, cols)
-        for i in range(len(schema)):
-            j_valueTypeArray[i] = schema[i]
-        for i in range(len(col_names)):
-            j_colNameArray[i] = str(col_names[i])
-        j = 0
-        for j, col_name in enumerate(col_names):
-            col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
-            for i in range(col_data.shape[0]):
-                if col_data[i]:
-                    j_dataArray[i][j] = col_data[i]
-        fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)
-
-        return fb
+
+        # execution speed increases with optimized code when the number of 
rows exceeds 4
+        if rows > 4:
+            for i in range(len(schema)):
+                j_valueTypeArray[i] = schema[i]
+            for i in range(len(col_names)):
+                j_colNameArray[i] = str(col_names[i])
+
+            fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, rows)
+
+            # convert and set data for each column
+            for j, col_name in enumerate(col_names):
+                col_type = schema[j]
+                if col_type == 
jvm.org.apache.sysds.common.Types.ValueType.STRING:
+                    byte_data = bytearray()
+                    for value in pd_df[col_name].astype(str):
+                        encoded_value = value.encode("utf-8")
+                        byte_data.extend(struct.pack(">I", len(encoded_value)))
+                        byte_data.extend(encoded_value)
+                else:
+                    col_data = pd_df[col_name].fillna("").to_numpy()
+                    byte_data = bytearray(col_data.tobytes())
+
+                converted_array = (
+                    
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
+                        byte_data, rows, col_type
+                    )
+                )
+                fb.setColumn(j, converted_array)
+            return fb
+        else:
+            j_dataArray = java_gate.new_array(jc_String, rows, cols)
+            for i in range(len(schema)):
+                j_valueTypeArray[i] = schema[i]
+            for i in range(len(col_names)):
+                j_colNameArray[i] = str(col_names[i])
+            j = 0
+            for j, col_name in enumerate(col_names):
+                col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
+                for i in range(col_data.shape[0]):
+                    if col_data[i]:
+                        j_dataArray[i][j] = col_data[i]
+            fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)
+            return fb
+
     except Exception as e:
         sds.exception_and_close(e)
 
@@ -146,7 +182,7 @@ def frame_block_to_pandas(sds, fb: JavaObject):
     df = pd.DataFrame()
 
     for c_index in range(num_cols):
-        col_array = fb.getColumn(c_index);
+        col_array = fb.getColumn(c_index)
 
         d_type = col_array.getValueType().toString()
         if d_type == "STRING":
@@ -174,10 +210,11 @@ def frame_block_to_pandas(sds, fb: JavaObject):
             ret = np.frombuffer(byteArray, dtype=np.dtype("?"))
         elif d_type == "CHARACTER":
             byteArray = fb.getColumn(c_index).getAsByteArray()
-            ret = np.frombuffer(byteArray , dtype=np.char)
+            ret = np.frombuffer(byteArray, dtype=np.char)
         else:
             raise NotImplementedError(
-                f'Not Implemented {d_type} for systemds to pandas parsing')
+                f"Not Implemented {d_type} for systemds to pandas parsing"
+            )
         df[fb.getColumnName(c_index)] = ret
 
     return df
diff --git a/src/main/python/tests/iotests/test_io_pandas_systemds.py 
b/src/main/python/tests/iotests/test_io_pandas_systemds.py
new file mode 100644
index 0000000000..0ddbf63a5d
--- /dev/null
+++ b/src/main/python/tests/iotests/test_io_pandas_systemds.py
@@ -0,0 +1,86 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+
+import os
+import shutil
+import unittest
+import pandas as pd
+from systemds.context import SystemDSContext
+
+
+class TestPandasFromToSystemds(unittest.TestCase):
+
+    sds: SystemDSContext = None
+    temp_dir: str = "tests/iotests/temp_write_csv/"
+    n_cols = 3
+    n_rows = 5
+    df = pd.DataFrame(
+        {
+            "C1": [f"col1_string_{i}" for i in range(n_rows)],
+            "C2": [i for i in range(n_rows)],
+        }
+    )
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+        if not os.path.exists(cls.temp_dir):
+            os.makedirs(cls.temp_dir)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+        shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+    def test_into_systemds(self):
+        # Transfer into SystemDS and write to CSV
+        frame = self.sds.from_pandas(self.df)
+        frame.write(
+            self.temp_dir + "into_systemds.csv", format="csv", header=True
+        ).compute(verbose=True)
+
+        # Read the CSV file using pandas
+        result_df = pd.read_csv(self.temp_dir + "into_systemds.csv")
+
+        # Verify the data
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        self.assertTrue(self.df.equals(result_df))
+
+    def test_out_of_systemds(self):
+        # Create a CSV file to read into SystemDS
+        self.df.to_csv(self.temp_dir + "out_of_systemds.csv", header=False, 
index=False)
+
+        # Read the CSV file into SystemDS and then compute back to pandas
+        frame = self.sds.read(
+            self.temp_dir + "out_of_systemds.csv", data_type="frame", 
format="csv"
+        )
+        result_df = frame.replace("xyz", "yzx").compute()
+
+        # Verify the data
+        result_df["C2"] = result_df["C2"].astype(int)
+
+        self.assertTrue(isinstance(result_df, pd.DataFrame))
+        self.assertTrue(self.df.equals(result_df))
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/matrix/test_print.py 
b/src/main/python/tests/matrix/test_print.py
index 5a345a18d7..dec3db2fa9 100644
--- a/src/main/python/tests/matrix/test_print.py
+++ b/src/main/python/tests/matrix/test_print.py
@@ -46,7 +46,7 @@ class TestPrint(unittest.TestCase):
     def test_print_01(self):
         self.sds.from_numpy(np.array([1])).to_string().print().compute()
         sleep(0.2)
-        self.assertEqual(1,float(self.sds.get_stdout()[0]))
+        self.assertEqual(1,float(self.sds.get_stdout()[0].replace(",", ".")))
 
     def test_print_02(self):
         self.sds.scalar(1).print().compute()
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
new file mode 100644
index 0000000000..965be8d71a
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.frame.array;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.sysds.common.Types;
+
+import org.apache.sysds.runtime.util.Py4jConverterUtils;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.junit.Test;
+
+public class Py4jConverterUtilsTest {
+
+       @Test
+       public void testConvertUINT8() {
+               int numElements = 4;
+               byte[] data = {1, 2, 3, 4};
+               Array<?> result = Py4jConverterUtils.convert(data, numElements, 
Types.ValueType.UINT8);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(1, result.get(0));
+               assertEquals(2, result.get(1));
+               assertEquals(3, result.get(2));
+               assertEquals(4, result.get(3));
+       }
+
+       @Test
+       public void testConvertINT32() {
+               int numElements = 4;
+               ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 
numElements);
+               buffer.order(ByteOrder.nativeOrder());
+               for(int i = 1; i <= numElements; i++) {
+                       buffer.putInt(i);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.INT32);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(1, result.get(0));
+               assertEquals(2, result.get(1));
+               assertEquals(3, result.get(2));
+               assertEquals(4, result.get(3));
+       }
+
+       @Test
+       public void testConvertFP64() {
+               int numElements = 4;
+               ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * 
numElements);
+               buffer.order(ByteOrder.nativeOrder());
+               for(double i = 1.1; i <= numElements + 1; i += 1.0) {
+                       buffer.putDouble(i);
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.FP64);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(1.1, result.get(0));
+               assertEquals(2.1, result.get(1));
+               assertEquals(3.1, result.get(2));
+               assertEquals(4.1, result.get(3));
+       }
+
+       @Test
+       public void testConvertBoolean() {
+               int numElements = 4;
+               byte[] data = {1, 0, 1, 0};
+               Array<?> result = Py4jConverterUtils.convert(data, numElements, 
Types.ValueType.BOOLEAN);
+               assertNotNull(result);
+               assertEquals(4, result.size());
+               assertEquals(true, result.get(0));
+               assertEquals(false, result.get(1));
+               assertEquals(true, result.get(2));
+               assertEquals(false, result.get(3));
+       }
+
+       @Test
+       public void testConvertString() {
+               int numElements = 2;
+               String[] strings = {"hello", "world"};
+               ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() 
+ 4 + strings[1].length());
+               buffer.order(ByteOrder.LITTLE_ENDIAN);
+               for(String s : strings) {
+                       buffer.order(ByteOrder.BIG_ENDIAN);
+                       buffer.putInt(s.length());
+                       buffer.order(ByteOrder.LITTLE_ENDIAN);
+                       buffer.put(s.getBytes(StandardCharsets.UTF_8));
+               }
+               Array<?> result = Py4jConverterUtils.convert(buffer.array(), 
numElements, Types.ValueType.STRING);
+               assertNotNull(result);
+               assertEquals(2, result.size());
+               assertEquals("hello", result.get(0));
+               assertEquals("world", result.get(1));
+       }
+}

Reply via email to