This is an automated email from the ASF dual-hosted git repository.
estrauss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 0e8e96685e [SYSTEMDS-3902] Faster data transfer for Python Frames to
Java Runtime by using pipes instead of py4j
0e8e96685e is described below
commit 0e8e96685ed282f6f85b48b0b5d6a8f8a84c870a
Author: e-strauss <[email protected]>
AuthorDate: Tue Dec 9 12:56:52 2025 +0100
[SYSTEMDS-3902] Faster data transfer for Python Frames to Java Runtime by
using pipes instead of py4j
This patch extends the previously added data transfer with unix pipes to
SystemDS frame transfer capabilities. Additionally, the matrix transfer was
further improved by fusing the non-zero value computation into data reading and
reducing unnecessary array allocations.
Closes #2363.
---
.gitignore | 2 +-
.../java/org/apache/sysds/api/PythonDMLScript.java | 99 ++-
.../runtime/frame/data/columns/FloatArray.java | 2 +-
...ltiReturnParameterizedBuiltinCPInstruction.java | 4 +-
.../apache/sysds/runtime/util/UnixPipeUtils.java | 717 +++++++++++++++---
.../python/systemds/context/systemds_context.py | 1 +
src/main/python/systemds/utils/converters.py | 840 +++++++++++++++++----
.../tests/matrix/test_block_converter_unix_pipe.py | 104 ---
.../tests/python_java_data_transfer/__init__.py | 20 +
.../test_dense_numpy_matrix.py | 246 ++++++
.../python_java_data_transfer/test_pandas_frame.py | 265 +++++++
src/main/python/tests/test_utils.py | 58 ++
.../test/component/utils/UnixPipeUtilsTest.java | 313 +++++++-
.../sysds/test/usertest/pythonapi/StartupTest.java | 98 ++-
14 files changed, 2318 insertions(+), 451 deletions(-)
diff --git a/.gitignore b/.gitignore
index d2fcdb9a4d..5de697a37e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -146,7 +146,7 @@
src/test/scripts/functions/pipelines/intermediates/classification/*
venv
venv/*
-
+.venv
# resource optimization
scripts/resource/output
*.pem
diff --git a/src/main/java/org/apache/sysds/api/PythonDMLScript.java
b/src/main/java/org/apache/sysds/api/PythonDMLScript.java
index 3b1864d71d..1a74ba0ea4 100644
--- a/src/main/java/org/apache/sysds/api/PythonDMLScript.java
+++ b/src/main/java/org/apache/sysds/api/PythonDMLScript.java
@@ -1,18 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
+ * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@@ -24,9 +24,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.sysds.api.jmlc.Connection;
+import org.apache.sysds.common.Types.ValueType;
-import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.frame.data.columns.Array;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.UnixPipeUtils;
@@ -79,7 +81,7 @@ public class PythonDMLScript {
* therefore use logging framework. and terminate
program.
*/
LOG.info("failed startup", p4e);
- System.exit(-1);
+ exitHandler.exit(-1);
}
catch(Exception e) {
throw new DMLException("Failed startup and maintaining
Python gateway", e);
@@ -116,59 +118,59 @@ public class PythonDMLScript {
}
}
- public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen,
Types.ValueType type) throws IOException {
+ public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen,
ValueType type) throws IOException {
long limit = (long) rlen * clen;
LOG.debug("trying to read matrix from "+id+" with "+rlen+" rows
and "+clen+" columns. Total size: "+limit);
if(limit > Integer.MAX_VALUE)
throw new DMLRuntimeException("Dense NumPy array of
size " + limit +
" cannot be converted to MatrixBlock");
- MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1);
+ MatrixBlock mb;
if(fromPython != null){
BufferedInputStream pipe = fromPython.get(id);
double[] denseBlock = new double[(int) limit];
- UnixPipeUtils.readNumpyArrayInBatches(pipe, id,
BATCH_SIZE, (int) limit, type, denseBlock, 0);
- mb.init(denseBlock, rlen, clen);
+ long nnz = UnixPipeUtils.readNumpyArrayInBatches(pipe,
id, BATCH_SIZE, (int) limit, type, denseBlock, 0);
+ mb = new MatrixBlock(rlen, clen, denseBlock);
+ mb.setNonZeros(nnz);
} else {
throw new DMLRuntimeException("FIFO Pipes are not
initialized.");
}
- mb.recomputeNonZeros();
- mb.examSparsity();
LOG.debug("Reading from Python finished");
+ mb.examSparsity();
return mb;
}
- public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen,
int clen, Types.ValueType type) throws ExecutionException, InterruptedException
{
+ public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen,
int clen, ValueType type) throws ExecutionException, InterruptedException {
long limit = (long) rlen * clen;
if(limit > Integer.MAX_VALUE)
throw new DMLRuntimeException("Dense NumPy array of
size " + limit +
" cannot be converted to MatrixBlock");
- MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1);
+ MatrixBlock mb = new MatrixBlock(rlen, clen, false, rlen*clen);
if(fromPython != null){
ExecutorService pool = CommonThreadPool.get();
double[] denseBlock = new double[(int) limit];
int offsetOut = 0;
- List<Future<Void>> futures = new ArrayList<>();
+ List<Future<Long>> futures = new ArrayList<>();
for (int i = 0; i < blockSizes.length; i++) {
BufferedInputStream pipe = fromPython.get(i);
int id = i, blockSize = blockSizes[i],
_offsetOut = offsetOut;
- Callable<Void> task = () -> {
-
UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type,
denseBlock, _offsetOut);
- return null;
+ Callable<Long> task = () -> {
+ return
UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type,
denseBlock, _offsetOut);
};
futures.add(pool.submit(task));
offsetOut += blockSize;
}
- // Wait for all tasks and propagate exceptions
- for (Future<Void> f : futures) {
- f.get();
+ // Wait for all tasks and propagate exceptions, sum up
nonzeros
+ long nnz = 0;
+ for (Future<Long> f : futures) {
+ nnz += f.get();
}
- mb.init(denseBlock, rlen, clen);
+ mb = new MatrixBlock(rlen, clen, denseBlock);
+ mb.setNonZeros(nnz);
} else {
throw new DMLRuntimeException("FIFO Pipes are not
initialized.");
}
- mb.recomputeNonZeros();
mb.examSparsity();
return mb;
}
@@ -181,7 +183,7 @@ public class PythonDMLScript {
LOG.debug("Trying to write matrix ["+baseDir + "-"+
id+"] with "+rlen+" rows and "+clen+" columns. Total size: "+numElem*8);
BufferedOutputStream out = toPython.get(id);
- long bytes =
UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem,
Types.ValueType.FP64, mb);
+ long bytes =
UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem,
ValueType.FP64, mb);
LOG.debug("Writing of " + bytes +" Bytes to Python
["+baseDir + "-"+ id+"] finished");
} else {
@@ -189,6 +191,43 @@ public class PythonDMLScript {
}
}
+ public void startReadingColFromPipe(int id, FrameBlock fb, int rows,
int totalBytes, int col, ValueType type, boolean any) throws IOException {
+ if (fromPython == null) {
+ throw new DMLRuntimeException("FIFO Pipes are not
initialized.");
+ }
+
+ BufferedInputStream pipe = fromPython.get(id);
+ LOG.debug("Start reading FrameBlock column from pipe #" + id +
" with type " + type);
+
+ // Delegate to UnixPipeUtils
+ Array<?> arr = UnixPipeUtils.readFrameColumnFromPipe(pipe, id,
rows, totalBytes, BATCH_SIZE, type);
+ // Set column into FrameBlock
+ fb.setColumn(col, arr);
+ ValueType[] schema = fb.getSchema();
+ // inplace update the schema for cases: int8 -> int32
+ schema[col] = arr.getValueType();
+
+ LOG.debug("Finished reading FrameBlock column from pipe #" +
id);
+ }
+
+ public void startWritingColToPipe(int id, FrameBlock fb, int col)
throws IOException {
+ if (toPython == null) {
+ throw new DMLRuntimeException("FIFO Pipes are not
initialized.");
+ }
+
+ BufferedOutputStream pipe = toPython.get(id);
+ ValueType type = fb.getSchema()[col];
+ int rows = fb.getNumRows();
+ Array<?> array = fb.getColumn(col);
+
+ LOG.debug("Start writing FrameBlock column #" + col + " to pipe
#" + id + " with type " + type + " and " + rows + " rows");
+
+ // Delegate to UnixPipeUtils
+ long bytes = UnixPipeUtils.writeFrameColumnToPipe(pipe, id,
BATCH_SIZE, array, type);
+
+ LOG.debug("Finished writing FrameBlock column #" + col + " to
pipe #" + id + ". Total bytes: " + bytes);
+ }
+
public void closePipes() throws IOException {
LOG.debug("Closing all pipes in Java");
for (BufferedInputStream pipe : fromPython.values())
@@ -198,6 +237,20 @@ public class PythonDMLScript {
LOG.debug("Closed all pipes in Java");
}
+ @FunctionalInterface
+ public interface ExitHandler {
+ void exit(int status);
+ }
+
+ private static volatile ExitHandler exitHandler = System::exit;
+
+ public static void setExitHandler(ExitHandler handler) {
+ exitHandler = handler == null ? System::exit : handler;
+ }
+
+ public static void resetExitHandler() {
+ exitHandler = System::exit;
+ }
protected static class DMLGateWayListener extends
DefaultGatewayServerListener {
private static final Log LOG =
LogFactory.getLog(DMLGateWayListener.class.getName());
diff --git
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
index d0ab7a5630..f838eadc1d 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
@@ -174,7 +174,7 @@ public class FloatArray extends Array<Float> {
@Override
public byte[] getAsByteArray() {
- ByteBuffer floatBuffer = ByteBuffer.allocate(8 * _size);
+ ByteBuffer floatBuffer = ByteBuffer.allocate(4 * _size);
floatBuffer.order(ByteOrder.nativeOrder());
for(int i = 0; i < _size; i++)
floatBuffer.putFloat(_data[i]);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index 28bd01f08d..98101348da 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -8,7 +8,7 @@
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -91,7 +91,7 @@ public class MultiReturnParameterizedBuiltinCPInstruction
extends ComputationCPI
FrameBlock fin = ec.getFrameInput(input1.getName());
String spec = ec.getScalarInput(input2).getStringValue();
String[] colnames = fin.getColumnNames();
-
+
// execute block transform encode
MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec,
colnames, fin.getNumColumns(), null);
// TODO: Assign #threads in compiler and pass via the
instruction string
diff --git a/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java
b/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java
index 69014acc0f..9ed1f73b7f 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java
@@ -1,29 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
+ * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.util;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.common.Types;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
@@ -37,10 +32,33 @@ import java.nio.ByteOrder;
import java.nio.DoubleBuffer;
import java.nio.FloatBuffer;
import java.nio.IntBuffer;
+import java.nio.LongBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
public class UnixPipeUtils {
private static final Log LOG =
LogFactory.getLog(UnixPipeUtils.class.getName());
+ public static int getElementSize(Types.ValueType type) {
+ return switch (type) {
+ case UINT8, BOOLEAN -> 1;
+ case INT32, FP32 -> 4;
+ case INT64, FP64 -> 8;
+ default -> throw new
UnsupportedOperationException("Unsupported type: " + type);
+ };
+ }
+
+ private static ByteBuffer newLittleEndianBuffer(byte[] buffer, int
length) {
+ return ByteBuffer.wrap(buffer, 0,
length).order(ByteOrder.LITTLE_ENDIAN);
+ }
+
/**
* Opens a named pipe for input, reads 4 bytes as an int, compares it
to the expected ID.
* If matched, returns the InputStream for further use.
@@ -74,7 +92,10 @@ public class UnixPipeUtils {
bis.close();
throw new IOException("Failed to read handshake integer
from pipe");
}
+ compareHandshakeIds(expectedId, bis, buffer);
+ }
+ private static void compareHandshakeIds(int expectedId,
BufferedInputStream bis, byte[] buffer) throws IOException {
// Convert bytes to int (assuming little-endian to match
typical Python struct.pack)
int receivedId =
ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getInt();
expectedId += 1000;
@@ -106,15 +127,65 @@ public class UnixPipeUtils {
bos.flush();
}
- public static void readNumpyArrayInBatches(BufferedInputStream in, int
id, int batchSize, int numElem,
-
Types.ValueType type, double[] out, int offsetOut)
- throws IOException {
- int elemSize;
- switch (type){
- case UINT8 -> elemSize = 1;
- case INT32, FP32 -> elemSize = 4;
- default -> elemSize = 8;
+ @FunctionalInterface
+ private interface BufferReader {
+ int readTo(Object dest, int offset, ByteBuffer bb);
+ }
+
+ private static BufferReader getBufferReader(Types.ValueType type) {
+ return switch (type) {
+ case FP64 -> (dest, offset, bb) -> {
+ DoubleBuffer db = bb.asDoubleBuffer();
+ double[] out = (double[]) dest;
+ int remaining = db.remaining();
+ db.get(out, offset, remaining);
+ return offset + remaining;
+ };
+ case FP32 -> (dest, offset, bb) -> {
+ FloatBuffer fb = bb.asFloatBuffer();
+ double[] out = (double[]) dest;
+ int n = fb.remaining();
+ for (int i = 0; i < n; i++) out[offset++] =
fb.get();
+ return offset;
+ };
+ case INT64 -> (dest, offset, bb) -> {
+ LongBuffer lb = bb.asLongBuffer();
+ double[] out = (double[]) dest;
+ int n = lb.remaining();
+ for (int i = 0; i < n; i++) out[offset++] =
lb.get();
+ return offset;
+ };
+ case INT32 -> (dest, offset, bb) -> {
+ IntBuffer ib = bb.asIntBuffer();
+ double[] out = (double[]) dest;
+ int n = ib.remaining();
+ for (int i = 0; i < n; i++) out[offset++] =
ib.get();
+ return offset;
+ };
+ case UINT8 -> (dest, offset, bb) -> {
+ double[] out = (double[]) dest;
+ for (int i = 0; i < bb.limit(); i++)
out[offset++] = bb.get(i) & 0xFF;
+ return offset;
+ };
+ default -> throw new
UnsupportedOperationException("Unsupported type: " + type);
+ };
+ }
+
+ private static void readFully(BufferedInputStream in, byte[] buffer,
int len) throws IOException {
+ int total = 0;
+ while (total < len) {
+ int read = in.read(buffer, total, len - total);
+ if (read == -1)
+ throw new EOFException("Unexpected end of
stream");
+ total += read;
}
+ }
+
+ public static long readNumpyArrayInBatches(BufferedInputStream in, int
id, int batchSize, int numElem,
+ Types.ValueType type, double[] out, int offsetOut)
+ throws IOException {
+ int elemSize = getElementSize(type);
+ long nonZeros = 0;
try {
// Read start header
@@ -122,26 +193,26 @@ public class UnixPipeUtils {
long bytesRemaining = ((long) numElem) * elemSize;
byte[] buffer = new byte[batchSize];
+ BufferReader reader = getBufferReader(type);
+ int prevOffset = offsetOut;
while (bytesRemaining > 0) {
- int currentBatchSize = (int)
Math.min(batchSize, bytesRemaining);
- int totalRead = 0;
-
- while (totalRead < currentBatchSize) {
- int bytesRead = in.read(buffer,
totalRead, currentBatchSize - totalRead);
- if (bytesRead == -1) {
- throw new
EOFException("Unexpected end of stream in pipe #" + id +
- ": expected " +
currentBatchSize + " bytes, got " + totalRead);
+ int chunk = (int) Math.min(batchSize,
bytesRemaining);
+ readFully(in, buffer, chunk);
+ offsetOut = reader.readTo(out, offsetOut,
newLittleEndianBuffer(buffer, chunk));
+
+ // Count nonzeros in the batch we just read
(performant: single pass)
+ for (int i = prevOffset; i < offsetOut; i++) {
+ if (out[i] != 0.0) {
+ nonZeros++;
}
- totalRead += bytesRead;
}
-
- // Interpret bytes with value type and fill the
dense MB
- offsetOut = fillDoubleArrayFromByteArray(type,
out, offsetOut, buffer, currentBatchSize);
- bytesRemaining -= currentBatchSize;
+ prevOffset = offsetOut;
+ bytesRemaining -= chunk;
}
// Read end header
readHandshake(id, in);
+ return nonZeros;
} catch (Exception e) {
LOG.error("Error occurred while reading data from pipe
#" + id, e);
@@ -149,120 +220,540 @@ public class UnixPipeUtils {
}
}
- private static int fillDoubleArrayFromByteArray(Types.ValueType type,
double[] out, int offsetOut, byte[] buffer,
-
int currentBatchSize) {
- ByteBuffer bb = ByteBuffer.wrap(buffer, 0,
currentBatchSize).order(ByteOrder.LITTLE_ENDIAN);
- switch (type){
- default -> {
- DoubleBuffer doubleBuffer = bb.asDoubleBuffer();
- int numDoubles = doubleBuffer.remaining();
- doubleBuffer.get(out, offsetOut, numDoubles);
- offsetOut += numDoubles;
+
+ @FunctionalInterface
+ private interface BufferWriter {
+ int writeFrom(Object src, int offset, ByteBuffer bb);
+ }
+
+ private static BufferWriter getBufferWriter(Types.ValueType type) {
+ return switch (type) {
+ case FP64 -> (src, offset, bb) -> {
+ MatrixBlock mb = (MatrixBlock) src;
+ DoubleBuffer db = bb.asDoubleBuffer();
+ int n = Math.min(db.remaining(),
mb.getNumRows() * mb.getNumColumns() - offset);
+ for (int i = 0; i < n; i++) {
+ int r = (offset + i) /
mb.getNumColumns();
+ int c = (offset + i) %
mb.getNumColumns();
+ db.put(mb.getDouble(r, c));
+ }
+ return n * 8;
+ };
+ case FP32 -> (src, offset, bb) -> {
+ MatrixBlock mb = (MatrixBlock) src;
+ FloatBuffer fb = bb.asFloatBuffer();
+ int n = Math.min(fb.remaining(),
mb.getNumRows() * mb.getNumColumns() - offset);
+ for (int i = 0; i < n; i++) {
+ int r = (offset + i) /
mb.getNumColumns();
+ int c = (offset + i) %
mb.getNumColumns();
+ fb.put((float) mb.getDouble(r, c));
+ }
+ return n * 4;
+ };
+ case INT64 -> (src, offset, bb) -> {
+ MatrixBlock mb = (MatrixBlock) src;
+ LongBuffer lb = bb.asLongBuffer();
+ int n = Math.min(lb.remaining(),
mb.getNumRows() * mb.getNumColumns() - offset);
+ for (int i = 0; i < n; i++) {
+ int r = (offset + i) /
mb.getNumColumns();
+ int c = (offset + i) %
mb.getNumColumns();
+ lb.put((long) mb.getDouble(r, c));
+ }
+ return n * 8;
+ };
+ case INT32 -> (src, offset, bb) -> {
+ MatrixBlock mb = (MatrixBlock) src;
+ IntBuffer ib = bb.asIntBuffer();
+ int n = Math.min(ib.remaining(),
mb.getNumRows() * mb.getNumColumns() - offset);
+ for (int i = 0; i < n; i++) {
+ int r = (offset + i) /
mb.getNumColumns();
+ int c = (offset + i) %
mb.getNumColumns();
+ ib.put((int) mb.getDouble(r, c));
+ }
+ return n * 4;
+ };
+ case UINT8 -> (src, offset, bb) -> {
+ MatrixBlock mb = (MatrixBlock) src;
+ int n = Math.min(bb.limit(), mb.getNumRows() *
mb.getNumColumns() - offset);
+ for (int i = 0; i < n; i++) {
+ int r = (offset + i) /
mb.getNumColumns();
+ int c = (offset + i) %
mb.getNumColumns();
+ bb.put(i, (byte) ((int) mb.getDouble(r,
c) & 0xFF));
+ }
+ return n;
+ };
+ default -> throw new
UnsupportedOperationException("Unsupported type: " + type);
+ };
+ }
+
+ /**
+ * Symmetric with readNumpyArrayInBatches — writes data in batches with
handshake.
+ */
+ public static long writeNumpyArrayInBatches(BufferedOutputStream out,
int id, int batchSize,
+
int numElem, Types.ValueType type, MatrixBlock mb)
+ throws IOException {
+ int elemSize = getElementSize(type);
+ long totalBytesWritten = 0;
+
+ try {
+ writeHandshake(id, out);
+ long bytesRemaining = ((long) numElem) * elemSize;
+ byte[] buffer = new byte[batchSize];
+ BufferWriter writer = getBufferWriter(type);
+
+ int offset = 0;
+ while (bytesRemaining > 0) {
+ int chunk = (int) Math.min(batchSize,
bytesRemaining);
+ ByteBuffer bb = newLittleEndianBuffer(buffer,
chunk);
+ int bytesFilled = writer.writeFrom(mb, offset,
bb);
+ out.write(buffer, 0, bytesFilled);
+ totalBytesWritten += bytesFilled;
+ bytesRemaining -= bytesFilled;
+ offset += bytesFilled / elemSize;
+ }
+
+ out.flush();
+ writeHandshake(id, out);
+ return totalBytesWritten;
+ } catch (Exception e) {
+ LOG.error("Error occurred while writing data to pipe #"
+ id, e);
+ throw e;
+ }
+ }
+
+ public static Array<?> readFrameColumnFromPipe(
+ BufferedInputStream in, int id, int rows, int
totalBytes, int batchSize,
+ Types.ValueType type) throws IOException {
+
+ long tStart = System.nanoTime();
+ long tIoStart, tIoTotal = 0;
+ long tDecodeTotal = 0;
+ int numStrings = 0;
+
+ readHandshake(id, in);
+ Array<?> array = ArrayFactory.allocate(type, rows);
+ byte[] buffer = new byte[batchSize];
+ try {
+ if (type != Types.ValueType.STRING) {
+ tIoStart = System.nanoTime();
+ readFixedTypeColumn(in, array, type, rows,
totalBytes, buffer);
+ tIoTotal = System.nanoTime() - tIoStart;
+ readHandshake(id, in);
+ } else {
+ tIoStart = System.nanoTime();
+ VarFillTiming timing =
readVariableTypeColumn(in, id, array, type, rows, buffer);
+ tIoTotal = System.nanoTime() - tIoStart;
+ tDecodeTotal = timing.decodeTime;
+ numStrings = timing.numStrings;
+ }
+ } catch (Exception e) {
+ LOG.error("Error occurred while reading FrameBlock
column from pipe #" + id, e);
+ throw e;
+ }
+
+ long tTotal = System.nanoTime() - tStart;
+ if (type == Types.ValueType.STRING) {
+ LOG.debug(String.format(
+ "Java readFrameColumnFromPipe timing:
total=%.3fs, I/O=%.3fs (%.1f%%), decode=%.3fs (%.1f%%), strings=%d",
+ tTotal / 1e9, tIoTotal / 1e9, 100.0 * tIoTotal
/ tTotal,
+ tDecodeTotal / 1e9, 100.0 * tDecodeTotal /
tTotal, numStrings));
+ }
+ return array;
+ }
+
+ private static class VarFillTiming {
+ long decodeTime;
+ int numStrings;
+ VarFillTiming(long decodeTime, int numStrings) {
+ this.decodeTime = decodeTime;
+ this.numStrings = numStrings;
+ }
+ }
+
+ private static void readFixedTypeColumn(
+ BufferedInputStream in, Array<?> array,
+ Types.ValueType type, int rows, int totalBytes, byte[]
buffer) throws IOException {
+
+ int elemSize = getElementSize(type);
+ int expected = rows * elemSize;
+ if (totalBytes != expected)
+ throw new IOException("Expected " + expected + " bytes
but got " + totalBytes);
+
+ int offset = 0;
+ long bytesRemaining = totalBytes;
+
+ while (bytesRemaining > 0) {
+ int chunk = (int) Math.min(buffer.length,
bytesRemaining);
+ readFully(in, buffer, chunk);
+ offset = fillFixedArrayFromBytes(array, type, offset,
buffer, chunk);
+ bytesRemaining -= chunk;
+ }
+ }
+
+ private static int fillFixedArrayFromBytes(
+ Array<?> array, Types.ValueType type, int offsetOut,
+ byte[] buffer, int currentBatchSize) {
+
+ ByteBuffer bb = newLittleEndianBuffer(buffer, currentBatchSize);
+
+ switch (type) {
+ case FP64 -> {
+ DoubleBuffer db = bb.asDoubleBuffer();
+ while (db.hasRemaining())
+ array.set(offsetOut++, db.get());
}
case FP32 -> {
- FloatBuffer floatBuffer = bb.asFloatBuffer();
- int numFloats = floatBuffer.remaining();
- for (int i = 0; i < numFloats; i++) {
- out[offsetOut++] = floatBuffer.get();
- }
+ FloatBuffer fb = bb.asFloatBuffer();
+ while (fb.hasRemaining())
+ array.set(offsetOut++, fb.get());
+ }
+ case INT64 -> {
+ LongBuffer lb = bb.asLongBuffer();
+ while (lb.hasRemaining())
+ array.set(offsetOut++, lb.get());
}
case INT32 -> {
- IntBuffer intBuffer = bb.asIntBuffer();
- int numInts = intBuffer.remaining();
- for (int i = 0; i < numInts; i++) {
- out[offsetOut++] = intBuffer.get();
- }
+ IntBuffer ib = bb.asIntBuffer();
+ while (ib.hasRemaining())
+ array.set(offsetOut++, ib.get());
}
case UINT8 -> {
- for (int i = 0; i < currentBatchSize; i++) {
- out[offsetOut++] = bb.get(i) & 0xFF;
- }
+ for (int i = 0; i < currentBatchSize; i++)
+ array.set(offsetOut++, (int) (bb.get(i)
& 0xFF));
}
+ case BOOLEAN -> {
+ for (int i = 0; i < currentBatchSize; i++)
+ array.set(offsetOut++, bb.get(i) != 0 ?
1.0 : 0.0);
+ }
+ default -> throw new
UnsupportedOperationException("Unsupported fixed type: " + type);
}
return offsetOut;
}
- public static long writeNumpyArrayInBatches(BufferedOutputStream out,
int id, int batchSize, int numElem,
-
Types.ValueType type, MatrixBlock mb) throws IOException {
- int elemSize;
- switch (type) {
- case UINT8 -> elemSize = 1;
- case INT32, FP32 -> elemSize = 4;
- default -> elemSize = 8;
+ private static VarFillTiming readVariableTypeColumn(
+ BufferedInputStream in, int id, Array<?> array,
+ Types.ValueType type, int elems, byte[] buffer) throws
IOException {
+
+ long tDecodeTotal = 0;
+ int numStrings = 0;
+
+ int offset = 0;
+ // Use a reusable growable byte array to avoid repeated
toByteArray() allocations
+ byte[] combined = new byte[32 * 1024]; // Start with 32KB
+ int combinedLen = 0;
+
+ // Keep reading until all expected elements are filled
+ while (offset < elems) {
+ int chunk = in.read(buffer);
+
+ // Ensure combined array is large enough
+ if (combinedLen + chunk > combined.length) {
+ // Grow array (double size, but at least
accommodate new data)
+ int newSize = Math.max(combined.length * 2,
combinedLen + chunk);
+ byte[] newCombined = new byte[newSize];
+ System.arraycopy(combined, 0, newCombined, 0,
combinedLen);
+ combined = newCombined;
+ }
+
+ // Append newly read bytes
+ System.arraycopy(buffer, 0, combined, combinedLen,
chunk);
+ combinedLen += chunk;
+
+ // Try decoding as many complete elements as possible
+ long tDecodeStart = System.nanoTime();
+ VarFillResult res = fillVariableArrayFromBytes(array,
offset, elems, combined, combinedLen, type);
+ tDecodeTotal += System.nanoTime() - tDecodeStart;
+ int stringsDecoded = res.offsetOut - offset;
+ numStrings += stringsDecoded;
+ offset = res.offsetOut;
+
+ // Retain any incomplete trailing bytes by shifting
them to the start
+ int remainingBytes = res.remainingBytes;
+ if (remainingBytes > 0) {
+ // Move remaining bytes to the start of the
buffer
+ System.arraycopy(combined, combinedLen -
remainingBytes, combined, 0, remainingBytes);
+ combinedLen = remainingBytes;
+ } else {
+ combinedLen = 0;
+ }
}
- long totalBytesWritten = 0;
- // Write start header
- writeHandshake(id, out);
+ // ---- handshake check ----
+ if(combinedLen == 0)
+ readHandshake(id, in);
+ else if (combinedLen == 4) {
+ byte[] tail = new byte[4];
+ System.arraycopy(combined, 0, tail, 0, 4);
+ compareHandshakeIds(id, in, tail);
+ }
+ else
+ throw new IOException("Expected 4-byte handshake after
last element, found " + combinedLen + " bytes");
- int bytesRemaining = numElem * elemSize;
- int offset = 0;
+ return new VarFillTiming(tDecodeTotal, numStrings);
+ }
- byte[] buffer = new byte[batchSize];
+ /**
+ * Result container for variable-length decoding.
+ *
+ * @param offsetOut number of elements written to the
output array
+ * @param remainingBytes number of unconsumed tail bytes (partial
element)
+ */
+ private record VarFillResult(int offsetOut, int remainingBytes) {
+ }
- while (bytesRemaining > 0) {
- int currentBatchSize = Math.min(batchSize,
bytesRemaining);
+ private static VarFillResult fillVariableArrayFromBytes(
+ Array<?> array, int offsetOut, int maxOffset, byte[]
buffer,
+ int currentBatchSize, Types.ValueType type) {
+
+ ByteBuffer bb = newLittleEndianBuffer(buffer, currentBatchSize);
+ int bytesConsumed = 0;
+
+ // Each variable-length element = [int32 length][payload...]
+ while (bb.remaining() >= 4 && offsetOut < maxOffset) {
+ bb.mark();
+ int len = bb.getInt();
+
+ if (len < 0) {
+ // null string
+ array.set(offsetOut++, (String) null);
+ bytesConsumed = bb.position();
+ continue;
+ }
+ if (bb.remaining() < len) {
+ // Not enough bytes for full payload → rollback
and stop
+ bb.reset();
+ break;
+ }
+
+
+ switch (type) {
+ case STRING -> {
+ int stringStart = bb.position();
+
+ byte[] backingArray = bb.array();
+ int arrayOffset = bb.arrayOffset() +
stringStart;
+ String s = new String(backingArray,
arrayOffset, len, StandardCharsets.UTF_8);
+ array.set(offsetOut++, s);
+
+ bb.position(stringStart + len);
+ }
- // Fill buffer from MatrixBlock into byte[] (typed)
- int bytesWritten = fillByteArrayFromDoubleArray(type,
mb, offset, buffer, currentBatchSize);
- totalBytesWritten += bytesWritten;
+ default -> throw new
UnsupportedOperationException(
+ "Unsupported variable-length
type: " + type);
+ }
- out.write(buffer, 0, currentBatchSize);
- offset += currentBatchSize / elemSize;
- bytesRemaining -= currentBatchSize;
+ bytesConsumed = bb.position();
}
- out.flush();
+ int remainingBytes = currentBatchSize - bytesConsumed;
+ return new VarFillResult(offsetOut, remainingBytes);
+ }
+
+ /**
+ * Symmetric with readFrameColumnFromPipe — writes FrameBlock column
data to pipe.
+ * Supports both fixed-size types and variable-length types (strings).
+ */
+ public static long writeFrameColumnToPipe(
+ BufferedOutputStream out, int id, int batchSize,
+ Array<?> array, Types.ValueType type) throws
IOException {
+
+ long tStart = System.nanoTime();
+ long tIoStart, tIoTotal = 0;
+ long tEncodeTotal = 0;
+ int numStrings = 0;
+ long totalBytesWritten = 0;
- // Write end header
- writeHandshake(id, out);
- return totalBytesWritten;
+ try {
+ writeHandshake(id, out);
+
+ if (type != Types.ValueType.STRING) {
+ tIoStart = System.nanoTime();
+ totalBytesWritten = writeFixedTypeColumn(out,
array, type, batchSize);
+ tIoTotal = System.nanoTime() - tIoStart;
+ } else {
+ tIoStart = System.nanoTime();
+ VarWriteTiming timing =
writeVariableTypeColumn(out, array, type, batchSize);
+ tIoTotal = System.nanoTime() - tIoStart;
+ tEncodeTotal = timing.encodeTime;
+ numStrings = timing.numStrings;
+ totalBytesWritten = timing.totalBytes;
+ }
+
+ out.flush();
+ writeHandshake(id, out);
+
+ long tTotal = System.nanoTime() - tStart;
+ if (type == Types.ValueType.STRING) {
+ LOG.debug(String.format(
+ "Java writeFrameColumnToPipe timing:
total=%.3fs, I/O=%.3fs (%.1f%%), encode=%.3fs (%.1f%%), strings=%d",
+ tTotal / 1e9, tIoTotal / 1e9, 100.0 *
tIoTotal / tTotal,
+ tEncodeTotal / 1e9, 100.0 *
tEncodeTotal / tTotal, numStrings));
+ }
+
+ return totalBytesWritten;
+ } catch (Exception e) {
+ LOG.error("Error occurred while writing FrameBlock
column to pipe #" + id, e);
+ throw e;
+ }
}
- private static int fillByteArrayFromDoubleArray(Types.ValueType type,
MatrixBlock mb, int offsetIn,
-
byte[] buffer, int maxBytes) {
- ByteBuffer bb = ByteBuffer.wrap(buffer, 0,
maxBytes).order(ByteOrder.LITTLE_ENDIAN);
- int r,c;
- switch (type) {
- default -> { // FP64
- DoubleBuffer doubleBuffer = bb.asDoubleBuffer();
- int count = Math.min(doubleBuffer.remaining(),
mb.getNumRows() * mb.getNumColumns() - offsetIn);
- for (int i = 0; i < count; i++) {
- r = (offsetIn + i) / mb.getNumColumns();
- c = (offsetIn + i) % mb.getNumColumns();
- doubleBuffer.put(mb.getDouble(r,c));
- }
- return count * 8;
+ private static class VarWriteTiming {
+ long encodeTime;
+ int numStrings;
+ long totalBytes;
+ VarWriteTiming(long encodeTime, int numStrings, long
totalBytes) {
+ this.encodeTime = encodeTime;
+ this.numStrings = numStrings;
+ this.totalBytes = totalBytes;
+ }
+ }
+
+ private static long writeFixedTypeColumn(
+ BufferedOutputStream out, Array<?> array,
+ Types.ValueType type, int batchSize) throws IOException
{
+
+ int elemSize = getElementSize(type);
+ int rows = array.size();
+ long totalBytes = (long) rows * elemSize;
+
+ byte[] buffer = new byte[batchSize];
+ int arrayIndex = 0;
+ int bufferPos = 0;
+
+ while (arrayIndex < rows) {
+ // Calculate how many elements can fit in the remaining
buffer space
+ int remainingBufferSpace = batchSize - bufferPos;
+ int elementsToWrite = Math.min((remainingBufferSpace /
elemSize), rows - arrayIndex);
+
+ if (elementsToWrite == 0) {
+ // Buffer is full, flush it
+ out.write(buffer, 0, bufferPos);
+ bufferPos = 0;
+ continue;
}
- case FP32 -> {
- FloatBuffer floatBuffer = bb.asFloatBuffer();
- int count = Math.min(floatBuffer.remaining(),
mb.getNumRows() * mb.getNumColumns() - offsetIn);
- for (int i = 0; i < count; i++) {
- r = (offsetIn + i) / mb.getNumColumns();
- c = (offsetIn + i) % mb.getNumColumns();
- floatBuffer.put((float)
mb.getDouble(r,c));
+
+ // Convert elements to bytes directly into the buffer
+ ByteBuffer bb = ByteBuffer.wrap(buffer, bufferPos,
elementsToWrite * elemSize)
+ .order(ByteOrder.LITTLE_ENDIAN);
+
+ switch (type) {
+ case FP64 -> {
+ DoubleBuffer db = bb.asDoubleBuffer();
+ for (int i = 0; i < elementsToWrite;
i++) {
+
db.put(array.getAsDouble(arrayIndex++));
+ }
+ bufferPos += elementsToWrite * 8;
+ }
+ case FP32 -> {
+ FloatBuffer fb = bb.asFloatBuffer();
+ for (int i = 0; i < elementsToWrite;
i++) {
+ fb.put((float)
array.getAsDouble(arrayIndex++));
+ }
+ bufferPos += elementsToWrite * 4;
+ }
+ case INT64 -> {
+ LongBuffer lb = bb.asLongBuffer();
+ for (int i = 0; i < elementsToWrite;
i++) {
+ lb.put((long)
array.getAsDouble(arrayIndex++));
+ }
+ bufferPos += elementsToWrite * 8;
}
- return count * 4;
- }
case INT32 -> {
- IntBuffer intBuffer = bb.asIntBuffer();
- int count = Math.min(intBuffer.remaining(),
mb.getNumRows() * mb.getNumColumns() - offsetIn);
- for (int i = 0; i < count; i++) {
- r = (offsetIn + i) / mb.getNumColumns();
- c = (offsetIn + i) % mb.getNumColumns();
- intBuffer.put((int) mb.getDouble(r,c));
+ IntBuffer ib = bb.asIntBuffer();
+ for (int i = 0; i < elementsToWrite; i++) {
+ ib.put((int)
array.getAsDouble(arrayIndex++));
}
- return count * 4;
+ bufferPos += elementsToWrite * 4;
}
- case UINT8 -> {
- int count = Math.min(maxBytes, mb.getNumRows()
* mb.getNumColumns() - offsetIn);
- for (int i = 0; i < count; i++) {
- r = (offsetIn + i) / mb.getNumColumns();
- c = (offsetIn + i) % mb.getNumColumns();
- buffer[i] = (byte) ((int)
mb.getDouble(r,c) & 0xFF);
+ case BOOLEAN -> {
+ for (int i = 0; i < elementsToWrite; i++) {
+ buffer[bufferPos++] = (byte)
(array.getAsDouble(arrayIndex++) != 0.0 ? 1 : 0);
}
- return count;
+ }
+ default -> throw new
UnsupportedOperationException("Unsupported type: " + type);
}
}
+
+ out.write(buffer, 0, bufferPos);
+ return totalBytes;
+ }
+
+ private static VarWriteTiming writeVariableTypeColumn(
+ BufferedOutputStream out, Array<?> array,
+ Types.ValueType type, int batchSize) throws IOException
{
+
+ long tEncodeTotal = 0;
+ int numStrings = 0;
+ long totalBytesWritten = 0;
+
+ byte[] buffer = new byte[batchSize]; // Use 2x batch size like
Python side
+ int pos = 0;
+
+ int rows = array.size();
+
+ for (int i = 0; i < rows; i++) {
+ numStrings++;
+
+ // Get string value
+ Object value = array.get(i);
+ boolean isNull = (value == null);
+
+ int length;
+ byte[] encoded;
+
+ if (isNull) {
+ // Use -1 as marker for null values
+ length = -1;
+ encoded = new byte[0];
+ } else {
+ // Encode to UTF-8
+ long tEncodeStart = System.nanoTime();
+ String str = value.toString();
+ encoded = str.getBytes(StandardCharsets.UTF_8);
+ tEncodeTotal += System.nanoTime() -
tEncodeStart;
+ length = encoded.length;
+ }
+
+ int entrySize = 4 + (length >= 0 ? length : 0); //
length prefix + data (or just prefix for null)
+
+ // If next string doesn't fit comfortably, flush first
half
+ if (pos + entrySize > batchSize) {
+ out.write(buffer, 0, pos);
+ totalBytesWritten += pos;
+ pos = 0;
+ }
+
+ // Write length prefix (little-endian) - use -1 for null
+ ByteBuffer bb =
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+ bb.putInt(length);
+ System.arraycopy(bb.array(), 0, buffer, pos, 4);
+ pos += 4;
+
+ // Write the encoded bytes (skip for null)
+ if (length > 0) {
+ int remainingBytes = length;
+ int encodedOffset = 0;
+ while (remainingBytes > 0) {
+ int chunk = Math.min(remainingBytes,
batchSize - pos);
+ System.arraycopy(encoded,
encodedOffset, buffer, pos, chunk);
+ pos += chunk;
+ if (pos == batchSize) {
+ out.write(buffer, 0, pos);
+ totalBytesWritten += pos;
+ pos = 0;
+ }
+ encodedOffset += chunk;
+ remainingBytes -= chunk;
+ }
+ }
+ }
+
+ // Flush the tail
+ if (pos > 0) {
+ out.write(buffer, 0, pos);
+ totalBytesWritten += pos;
+ }
+
+ return new VarWriteTiming(tEncodeTotal, numStrings,
totalBytesWritten);
}
}
\ No newline at end of file
diff --git a/src/main/python/systemds/context/systemds_context.py
b/src/main/python/systemds/context/systemds_context.py
index 41cfdfc698..99a6cba57b 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -126,6 +126,7 @@ class SystemDSContext(object):
self._FIFO_PY2JAVA_PIPES = out_pipes
self._FIFO_JAVA2PY_PIPES = in_pipes
else:
+ self._log.info("Using py4j for data transfer")
self._data_transfer_mode = 0
def __init_pipes(self, num_pipes):
diff --git a/src/main/python/systemds/utils/converters.py
b/src/main/python/systemds/utils/converters.py
index 93744a267e..ab7b7ffd8d 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -20,16 +20,21 @@
# -------------------------------------------------------------
import struct
-import tempfile
-import mmap
-import time
-
+from time import time
import numpy as np
import pandas as pd
import concurrent.futures
from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView
import os
+# Constants
+_HANDSHAKE_OFFSET = 1000
+_DEFAULT_BATCH_SIZE_BYTES = 32 * 1024 # 32 KB
+_FRAME_BATCH_SIZE_BYTES = 16 * 1024 # 16 KB
+_MIN_BYTES_PER_PIPE = 1024 * 1024 * 1024 # 1 GB
+_STRING_LENGTH_PREFIX_SIZE = 4 # int32
+_MAX_ROWS_FOR_OPTIMIZED_CONVERSION = 4
+
def format_bytes(size):
for unit in ["Bytes", "KB", "MB", "GB", "TB", "PB"]:
@@ -38,36 +43,39 @@ def format_bytes(size):
size /= 1024.0
-def pipe_transfer_header(pipe, pipe_id):
- handshake = struct.pack("<i", pipe_id + 1000)
+def _pipe_transfer_header(pipe, pipe_id):
+ """Sends a handshake header to the pipe."""
+ handshake = struct.pack("<i", pipe_id + _HANDSHAKE_OFFSET)
os.write(pipe.fileno(), handshake)
-def pipe_transfer_bytes(pipe, offset, end, batch_size_bytes, mem_view):
+def _pipe_transfer_bytes(pipe, offset, end, batch_size_bytes, mem_view):
+ """Transfers bytes from memoryview to pipe in batches."""
while offset < end:
# Slice the memoryview without copying
slice_end = min(offset + batch_size_bytes, end)
chunk = mem_view[offset:slice_end]
written = os.write(pipe.fileno(), chunk)
if written == 0:
- raise Exception("Buffer issue")
+ raise IOError("Buffer issue: wrote 0 bytes")
offset += written
-def pipe_receive_header(pipe, pipe_id, logger):
- expected_handshake = pipe_id + 1000
- header = os.read(pipe.fileno(), 4) # pipe.read(4)
- if len(header) < 4:
+def _pipe_receive_header(pipe, pipe_id, logger):
+ """Receives and validates a handshake header from the pipe."""
+ expected_handshake = pipe_id + _HANDSHAKE_OFFSET
+ header = os.read(pipe.fileno(), _STRING_LENGTH_PREFIX_SIZE)
+ if len(header) < _STRING_LENGTH_PREFIX_SIZE:
raise IOError("Failed to read handshake header")
received = struct.unpack("<i", header)[0]
if received != expected_handshake:
raise ValueError(
f"Handshake mismatch: expected {expected_handshake}, got
{received}"
)
- logger.debug("Read handshake successfully")
-def pipe_receive_bytes(pipe, view, offset, end, batch_size_bytes, logger):
+def _pipe_receive_bytes(pipe, view, offset, end, batch_size_bytes, logger):
+ """Receives bytes from pipe into memoryview in batches."""
while offset < end:
slice_end = min(offset + batch_size_bytes, end)
chunk = os.read(pipe.fileno(), slice_end - offset)
@@ -78,6 +86,200 @@ def pipe_receive_bytes(pipe, view, offset, end,
batch_size_bytes, logger):
offset += actual_size
+def _pipe_receive_strings(
+ pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0,
logger=None
+):
+ """
+ Reads UTF-8 encoded strings from the pipe in batches.
+ Format: <I (little-endian int32) length prefix, followed by UTF-8 bytes.
+
+ Returns: tuple of (strings_list, total_time, decode_time, io_time,
num_strings)
+ """
+ t_total_start = time()
+ t_decode = 0.0
+ t_io = 0.0
+
+ strings = []
+ fd = pipe.fileno() # Cache file descriptor
+
+ # Use a reusable buffer to avoid repeated allocations
+ buf = bytearray(batch_size * 2)
+ buf_pos = 0
+ buf_remaining = 0 # Number of bytes already in buffer
+
+ i = 0
+ while i < num_strings:
+ # If we don't have enough bytes for the length prefix, read more
+ if buf_remaining < _STRING_LENGTH_PREFIX_SIZE:
+ # Shift remaining bytes to start of buffer
+ if buf_remaining > 0:
+ buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
+
+ # Read more data
+ t0 = time()
+ chunk = os.read(fd, batch_size)
+ t_io += time() - t0
+ if not chunk:
+ raise IOError("Pipe read returned empty data unexpectedly")
+
+ # Append new data to buffer
+ chunk_len = len(chunk)
+ if buf_remaining + chunk_len > len(buf):
+ # Grow buffer if needed
+ new_buf = bytearray(len(buf) * 2)
+ new_buf[:buf_remaining] = buf[:buf_remaining]
+ buf = new_buf
+
+ buf[buf_remaining : buf_remaining + chunk_len] = chunk
+ buf_remaining += chunk_len
+ buf_pos = 0
+
+ # Read length prefix (little-endian int32)
+ # Note: length can be -1 (0xFFFFFFFF) to indicate null value
+ length = struct.unpack(
+ "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
+ )[0]
+ buf_pos += _STRING_LENGTH_PREFIX_SIZE
+ buf_remaining -= _STRING_LENGTH_PREFIX_SIZE
+
+ # Handle null value (marked by -1)
+ if length == -1:
+ strings.append(None)
+ i += 1
+ continue
+
+ # If we don't have enough bytes for the string data, read more
+ if buf_remaining < length:
+ # Shift remaining bytes to start of buffer
+ if buf_remaining > 0:
+ buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining]
+ buf_pos = 0
+
+ # Read more data until we have enough
+ bytes_needed = length - buf_remaining
+ while bytes_needed > 0:
+ t0 = time()
+ chunk = os.read(fd, min(batch_size, bytes_needed))
+ t_io += time() - t0
+ if not chunk:
+ raise IOError("Pipe read returned empty data unexpectedly")
+
+ chunk_len = len(chunk)
+ if buf_remaining + chunk_len > len(buf):
+ # Grow buffer if needed
+ new_buf = bytearray(len(buf) * 2)
+ new_buf[:buf_remaining] = buf[:buf_remaining]
+ buf = new_buf
+
+ buf[buf_remaining : buf_remaining + chunk_len] = chunk
+ buf_remaining += chunk_len
+ bytes_needed -= chunk_len
+
+ # Decode the string
+ t0 = time()
+ if length == 0:
+ decoded_str = ""
+ else:
+ decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8")
+ t_decode += time() - t0
+
+ strings.append(decoded_str)
+ buf_pos += length
+ buf_remaining -= length
+ i += 1
+ header_received = False
+ if buf_remaining == _STRING_LENGTH_PREFIX_SIZE:
+ # There is still data in the buffer, probably the handshake header
+ received = struct.unpack(
+ "<i", buf[buf_pos : buf_pos + _STRING_LENGTH_PREFIX_SIZE]
+ )[0]
+ if received != pipe_id + _HANDSHAKE_OFFSET:
+ raise ValueError(
+ "Handshake mismatch: expected {}, got {}".format(
+ pipe_id + _HANDSHAKE_OFFSET, received
+ )
+ )
+ header_received = True
+ elif buf_remaining > _STRING_LENGTH_PREFIX_SIZE:
+ raise ValueError(
+ "Unexpected number of bytes in buffer: {}".format(buf_remaining)
+ )
+
+ t_total = time() - t_total_start
+ return (strings, t_total, t_decode, t_io, num_strings, header_received)
+
+
+def _get_numpy_value_type(jvm, dtype):
+ """Maps numpy dtype to SystemDS ValueType."""
+ if dtype is np.dtype(np.uint8):
+ return jvm.org.apache.sysds.common.Types.ValueType.UINT8
+ elif dtype is np.dtype(np.int32):
+ return jvm.org.apache.sysds.common.Types.ValueType.INT32
+ elif dtype is np.dtype(np.float32):
+ return jvm.org.apache.sysds.common.Types.ValueType.FP32
+ else:
+ return jvm.org.apache.sysds.common.Types.ValueType.FP64
+
+
+def _transfer_matrix_block_single_pipe(
+ sds, pipe_id, pipe, mv, total_bytes, rows, cols, value_type, ep
+):
+ """Transfers matrix block data using a single pipe."""
+ sds._log.debug(
+ "Using single FIFO pipe for transferring
{}".format(format_bytes(total_bytes))
+ )
+ fut = sds._executor_pool.submit(
+ ep.startReadingMbFromPipe, pipe_id, rows, cols, value_type
+ )
+
+ _pipe_transfer_header(pipe, pipe_id) # start
+ _pipe_transfer_bytes(pipe, 0, total_bytes, _DEFAULT_BATCH_SIZE_BYTES, mv)
+ _pipe_transfer_header(pipe, pipe_id) # end
+
+ return fut.result() # Java returns MatrixBlock
+
+
+def _transfer_matrix_block_multi_pipe(
+ sds, mv, arr, np_arr, total_bytes, rows, cols, value_type, ep, jvm
+):
+ """Transfers matrix block data using multiple pipes in parallel."""
+ num_pipes = min(len(sds._FIFO_PY2JAVA_PIPES), total_bytes //
_MIN_BYTES_PER_PIPE)
+ # Align blocks per element
+ num_elems = len(arr)
+ elem_size = np_arr.dtype.itemsize
+ min_elems_block = num_elems // num_pipes
+ left_over = num_elems % num_pipes
+ block_sizes = sds.java_gateway.new_array(jvm.int, num_pipes)
+ for i in range(num_pipes):
+ block_sizes[i] = min_elems_block + int(i < left_over)
+
+ # Run java readers in parallel
+ fut_java = sds._executor_pool.submit(
+ ep.startReadingMbFromPipes, block_sizes, rows, cols, value_type
+ )
+
+ # Run writers in parallel
+ def _pipe_write_task(_pipe_id, _pipe, memview, start, end):
+ _pipe_transfer_header(_pipe, _pipe_id)
+ _pipe_transfer_bytes(_pipe, start, end, _DEFAULT_BATCH_SIZE_BYTES,
memview)
+ _pipe_transfer_header(_pipe, _pipe_id)
+
+ cur = 0
+ futures = []
+ for i, size in enumerate(block_sizes):
+ pipe = sds._FIFO_PY2JAVA_PIPES[i]
+ start_byte = cur * elem_size
+ cur += size
+ end_byte = cur * elem_size
+
+ fut = sds._executor_pool.submit(
+ _pipe_write_task, i, pipe, mv, start_byte, end_byte
+ )
+ futures.append(fut)
+
+ return fut_java.result() # Java returns MatrixBlock
+
+
def numpy_to_matrix_block(sds, np_arr: np.array):
"""Converts a given numpy array, to internal matrix block representation.
@@ -89,7 +291,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
cols = np_arr.shape[1] if np_arr.ndim == 2 else 1
if rows > 2147483647:
- raise Exception("")
+ raise ValueError("Matrix rows exceed maximum value (2147483647)")
# If not numpy array then convert to numpy array
if not isinstance(np_arr, np.ndarray):
@@ -98,90 +300,45 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
jvm: JVMView = sds.java_gateway.jvm
ep = sds.java_gateway.entry_point
- # flatten and set value type
+ # Flatten and set value type
if np_arr.dtype is np.dtype(np.uint8):
arr = np_arr.ravel()
- value_type = jvm.org.apache.sysds.common.Types.ValueType.UINT8
elif np_arr.dtype is np.dtype(np.int32):
arr = np_arr.ravel()
- value_type = jvm.org.apache.sysds.common.Types.ValueType.INT32
elif np_arr.dtype is np.dtype(np.float32):
arr = np_arr.ravel()
- value_type = jvm.org.apache.sysds.common.Types.ValueType.FP32
else:
arr = np_arr.ravel().astype(np.float64)
- value_type = jvm.org.apache.sysds.common.Types.ValueType.FP64
+
+ value_type = _get_numpy_value_type(jvm, np_arr.dtype)
if sds._data_transfer_mode == 1:
mv = memoryview(arr).cast("B")
total_bytes = mv.nbytes
- min_bytes_per_pipe = 1024 * 1024 * 1024 * 1
- batch_size_bytes = 32 * 1024 # pipe's ring buffer is 64KB
# Using multiple pipes is disabled by default
use_single_pipe = (
- not sds._multi_pipe_enabled or total_bytes < 2 * min_bytes_per_pipe
+ not sds._multi_pipe_enabled or total_bytes < 2 *
_MIN_BYTES_PER_PIPE
)
if use_single_pipe:
- sds._log.debug(
- "Using single FIFO pipe for reading {}".format(
- format_bytes(total_bytes)
- )
- )
- pipe_id = 0
- pipe = sds._FIFO_PY2JAVA_PIPES[pipe_id]
- fut = sds._executor_pool.submit(
- ep.startReadingMbFromPipe, pipe_id, rows, cols, value_type
+ return _transfer_matrix_block_single_pipe(
+ sds,
+ 0,
+ sds._FIFO_PY2JAVA_PIPES[0],
+ mv,
+ total_bytes,
+ rows,
+ cols,
+ value_type,
+ ep,
)
-
- pipe_transfer_header(pipe, pipe_id) # start
- pipe_transfer_bytes(pipe, 0, total_bytes, batch_size_bytes, mv)
- pipe_transfer_header(pipe, pipe_id) # end
-
- return fut.result() # Java returns MatrixBlock
else:
- num_pipes = min(
- len(sds._FIFO_PY2JAVA_PIPES), total_bytes // min_bytes_per_pipe
- )
- # align blocks per element
- num_elems = len(arr)
- elem_size = np_arr.dtype.itemsize
- min_elems_block = num_elems // num_pipes
- left_over = num_elems % num_pipes
- block_sizes = sds.java_gateway.new_array(jvm.int, num_pipes)
- for i in range(num_pipes):
- block_sizes[i] = min_elems_block + int(i < left_over)
-
- # run java readers in parallel
- fut_java = sds._executor_pool.submit(
- ep.startReadingMbFromPipes, block_sizes, rows, cols, value_type
+ return _transfer_matrix_block_multi_pipe(
+ sds, mv, arr, np_arr, total_bytes, rows, cols, value_type, ep,
jvm
)
-
- # run writers in parallel
- def _pipe_write_task(_pipe_id, _pipe, memview, start, end):
- pipe_transfer_header(_pipe, _pipe_id)
- pipe_transfer_bytes(_pipe, start, end, batch_size_bytes,
memview)
- pipe_transfer_header(_pipe, _pipe_id)
-
- cur = 0
- futures = []
- for i, size in enumerate(block_sizes):
- pipe = sds._FIFO_PY2JAVA_PIPES[i]
- start_byte = cur * elem_size
- cur += size
- end_byte = cur * elem_size
-
- fut = sds._executor_pool.submit(
- _pipe_write_task, i, pipe, mv, start_byte, end_byte
- )
- futures.append(fut)
-
- return fut_java.result() # Java returns MatrixBlock
else:
- # prepare byte buffer.
+ # Prepare byte buffer and send data to java via Py4J
buf = arr.tobytes()
-
- # Send data to java.
j_class: JavaClass =
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils
return j_class.convertPy4JArrayToMB(buf, rows, cols, value_type)
@@ -213,7 +370,7 @@ def matrix_block_to_numpy(sds, mb: JavaObject):
pipe = sds._FIFO_JAVA2PY_PIPES[pipe_id]
sds._log.debug(
- "Using single FIFO pipe for reading {}".format(
+ "Using single FIFO pipe for transferring {}".format(
format_bytes(total_bytes)
)
)
@@ -221,14 +378,9 @@ def matrix_block_to_numpy(sds, mb: JavaObject):
# Java starts writing to pipe in background
fut = sds._executor_pool.submit(ep.startWritingMbToPipe, pipe_id,
mb)
- pipe_receive_header(pipe, pipe_id, sds._log)
- sds._log.debug(
- "Py4j task for writing {} [{}] is: done=[{}],
running=[{}]".format(
- format_bytes(total_bytes), sds._FIFO_PATH, fut.done(),
fut.running()
- )
- )
- pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes,
sds._log)
- pipe_receive_header(pipe, pipe_id, sds._log)
+ _pipe_receive_header(pipe, pipe_id, sds._log)
+ _pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes,
sds._log)
+ _pipe_receive_header(pipe, pipe_id, sds._log)
fut.result()
sds._log.debug("Reading is done for
{}".format(format_bytes(total_bytes)))
@@ -246,7 +398,9 @@ def matrix_block_to_numpy(sds, mb: JavaObject):
return None
-def convert(jvm, fb, idx, num_elements, value_type, pd_series,
conversion="column"):
+def _convert_pandas_series_to_frameblock(
+ jvm, fb, idx, num_elements, value_type, pd_series, conversion="column"
+):
"""Converts a given pandas column or row to a FrameBlock representation.
:param jvm: The JVMView of the current SystemDS context.
@@ -326,59 +480,411 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
try:
jc_String = jvm.java.lang.String
jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock
- # execution speed increases with optimized code when the number of
rows exceeds 4
- if rows > 4:
- # Row conversion if more columns than rows and all columns have
the same type, otherwise column
- conversion_type = (
- "row" if cols > rows and len(set(pd_df.dtypes)) == 1 else
"column"
- )
- if conversion_type == "row":
- pd_df = pd_df.transpose()
- col_names = pd_df.columns.tolist() # re-calculate col names
- fb = jc_FrameBlock(
+ if sds._data_transfer_mode == 1:
+ return pandas_to_frame_block_pipe(
+ col_names,
+ j_colNameArray,
j_valueTypeArray,
+ jc_FrameBlock,
+ pd_df,
+ rows,
+ schema,
+ sds,
+ )
+ else:
+ return pandas_to_frame_block_py4j(
+ col_names,
j_colNameArray,
- rows if conversion_type == "column" else None,
+ j_valueTypeArray,
+ jc_FrameBlock,
+ jc_String,
+ pd_df,
+ rows,
+ cols,
+ schema,
+ sds,
+ )
+
+ except Exception as e:
+ sds.exception_and_close(e)
+
+
+def pandas_to_frame_block_py4j(
+ col_names: list,
+ j_colNameArray,
+ j_valueTypeArray,
+ jc_FrameBlock,
+ jc_String,
+ pd_df: pd.DataFrame,
+ rows: int,
+ cols: int,
+ schema: list,
+ sds,
+):
+ java_gate = sds.java_gateway
+ jvm = java_gate.jvm
+
+ # Execution speed increases with optimized code when the number of rows
exceeds threshold
+ if rows > _MAX_ROWS_FOR_OPTIMIZED_CONVERSION:
+ # Row conversion if more columns than rows and all columns have the
same type, otherwise column
+ conversion_type = (
+ "row" if cols > rows and len(set(pd_df.dtypes)) == 1 else "column"
+ )
+ if conversion_type == "row":
+ pd_df = pd_df.transpose()
+ col_names = pd_df.columns.tolist() # re-calculate col names
+
+ fb = jc_FrameBlock(
+ j_valueTypeArray,
+ j_colNameArray,
+ rows if conversion_type == "column" else None,
+ )
+ if conversion_type == "row":
+ fb.ensureAllocatedColumns(rows)
+
+ # We use .submit() with explicit .result() calling to properly
propagate exceptions
+ with concurrent.futures.ThreadPoolExecutor() as executor:
+ futures = [
+ executor.submit(
+ _convert_pandas_series_to_frameblock,
+ jvm,
+ fb,
+ i,
+ rows if conversion_type == "column" else cols,
+ schema[i],
+ pd_df[col_name],
+ conversion_type,
+ )
+ for i, col_name in enumerate(col_names)
+ ]
+
+ for future in concurrent.futures.as_completed(futures):
+ future.result()
+
+ return fb
+ else:
+ j_dataArray = java_gate.new_array(jc_String, rows, cols)
+
+ for j, col_name in enumerate[str](col_names):
+ col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
+
+ for i in range(col_data.shape[0]):
+ if col_data[i]:
+ j_dataArray[i][j] = col_data[i]
+
+ fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)
+ return fb
+
+
+def _transfer_string_column_to_pipe(
+ sds, pipe, pipe_id, pd_series, col_name, rows, fb, col_idx, schema, ep
+):
+ """Transfers a string column to FrameBlock via pipe."""
+ t0 = time()
+
+ # Start Java reader in background
+ fut = sds._executor_pool.submit(
+ ep.startReadingColFromPipe, pipe_id, fb, rows, -1, col_idx, schema,
True
+ )
+
+ _pipe_transfer_header(pipe, pipe_id) # start
+ py_timing = _pipe_transfer_strings(pipe, pd_series,
_FRAME_BATCH_SIZE_BYTES)
+ _pipe_transfer_header(pipe, pipe_id) # end
+
+ fut.result()
+
+ t1 = time()
+
+ # Print aggregated timing breakdown
+ py_total, py_encoding, py_packing, py_io, num_strings = py_timing
+ total_time = t1 - t0
+
+ sds._log.debug(
+ f"""
+ === TO FrameBlock - Timing Breakdown (Strings) ===
+ Column: {col_name}
+ Total time: {total_time:.3f}s
+ Python side (writing):
+ Total: {py_total:.3f}s
+ Encoding: {py_encoding:.3f}s ({100*py_encoding/py_total:.1f}%)
+ Struct packing: {py_packing:.3f}s ({100*py_packing/py_total:.1f}%)
+ I/O writes: {py_io:.3f}s ({100*py_io/py_total:.1f}%)
+ Other: {py_total - py_encoding - py_packing - py_io:.3f}s
+ Strings processed: {num_strings:,}
+ """
+ )
+
+
+def _transfer_numeric_column_to_pipe(
+ sds, pipe, pipe_id, byte_data, col_name, rows, fb, col_idx, schema, ep
+):
+ """Transfers a numeric column to FrameBlock via pipe."""
+ mv = memoryview(byte_data).cast("B")
+ total_bytes = mv.nbytes
+ sds._log.debug(
+ "TO FrameBlock - Using single FIFO pipe for transferring {} | {} bytes
| Column: {}".format(
+ format_bytes(total_bytes), total_bytes, col_name
+ )
+ )
+
+ fut = sds._executor_pool.submit(
+ ep.startReadingColFromPipe,
+ pipe_id,
+ fb,
+ rows,
+ total_bytes,
+ col_idx,
+ schema,
+ True,
+ )
+
+ _pipe_transfer_header(pipe, pipe_id) # start
+ _pipe_transfer_bytes(pipe, 0, total_bytes, _FRAME_BATCH_SIZE_BYTES, mv)
+ _pipe_transfer_header(pipe, pipe_id) # end
+
+ fut.result()
+
+
+def pandas_to_frame_block_pipe(
+ col_names: list,
+ j_colNameArray,
+ j_valueTypeArray,
+ jc_FrameBlock,
+ pd_df: pd.DataFrame,
+ rows: int,
+ schema: list,
+ sds,
+):
+ ep = sds.java_gateway.entry_point
+ fb = jc_FrameBlock(
+ j_valueTypeArray,
+ j_colNameArray,
+ rows,
+ )
+
+ pipe_id = 0
+ pipe = sds._FIFO_PY2JAVA_PIPES[pipe_id]
+
+ for i, col_name in enumerate(col_names):
+ pd_series = pd_df[col_name]
+
+ if pd_series.dtype == "string" or pd_series.dtype == "object":
+ _transfer_string_column_to_pipe(
+ sds, pipe, pipe_id, pd_series, col_name, rows, fb, i,
schema[i], ep
)
- if conversion_type == "row":
- fb.ensureAllocatedColumns(rows)
-
- # We use .submit() with explicit .result() calling to properly
propagate exceptions
- with concurrent.futures.ThreadPoolExecutor() as executor:
- futures = [
- executor.submit(
- convert,
- jvm,
- fb,
- i,
- rows if conversion_type == "column" else cols,
- schema[i],
- pd_df[col_name],
- conversion_type,
- )
- for i, col_name in enumerate(col_names)
- ]
-
- for future in concurrent.futures.as_completed(futures):
- future.result()
-
- return fb
+ continue
+
+ # Prepare numeric data
+ if pd_series.dtype == "bool":
+ # Convert boolean to uint8 (0/1) for proper byte representation
+ byte_data = pd_series.fillna(False).astype(np.uint8).to_numpy()
else:
- j_dataArray = java_gate.new_array(jc_String, rows, cols)
+ byte_data = pd_series.fillna("").to_numpy()
- for j, col_name in enumerate(col_names):
- col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
+ _transfer_numeric_column_to_pipe(
+ sds, pipe, pipe_id, byte_data, col_name, rows, fb, i, schema[i], ep
+ )
- for i in range(col_data.shape[0]):
- if col_data[i]:
- j_dataArray[i][j] = col_data[i]
+ return fb
- fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)
- return fb
- except Exception as e:
- sds.exception_and_close(e)
+def _pipe_transfer_strings(pipe, pd_series,
batch_size=_DEFAULT_BATCH_SIZE_BYTES):
+ """
+ Streams UTF-8 encoded strings to the pipe in batches without building the
full bytearray first.
+ Uses a 2×batch_size buffer to accommodate long strings without frequent
flushes.
+
+ Returns: tuple of (total_time, encoding_time, packing_time, io_time,
num_strings)
+ """
+ t_total_start = time()
+ t_encoding = 0.0
+ t_packing = 0.0
+ t_io = 0.0
+ num_strings = 0
+
+ buf = bytearray(batch_size * 2)
+ view = memoryview(buf)
+ pos = 0
+ fd = pipe.fileno() # Cache file descriptor to avoid repeated lookups
+
+ # Convert pandas Series to list/array for faster iteration (avoids pandas
overhead)
+ # Use .values for numpy array or .tolist() for Python list - tolist() is
often faster for strings
+ values = pd_series.tolist() if hasattr(pd_series, "tolist") else
list(pd_series)
+
+ for value in values:
+ num_strings += 1
+
+ # Check for null values (None, pd.NA, np.nan)
+ is_null = value is None or pd.isna(value)
+
+ if is_null:
+ # Use -1 as marker for null values (signed int32)
+ length = -1
+ entry_size = _STRING_LENGTH_PREFIX_SIZE # Only length prefix, no
data bytes
+ else:
+ # Encode and get length - len() on bytes is very fast (O(1)
attribute access)
+ t0 = time()
+ encoded = value.encode("utf-8")
+ t_encoding += time() - t0
+
+ length = len(encoded) # Fast O(1) operation on bytes
+ entry_size = _STRING_LENGTH_PREFIX_SIZE + length # length prefix
+ data
+
+ # if next string doesn't fit comfortably, flush first half
+ if pos + entry_size > batch_size:
+ # write everything up to 'pos'
+ t0 = time()
+ written = os.write(fd, view[:pos])
+ t_io += time() - t0
+ if written != pos:
+ raise IOError(f"Expected to write {pos} bytes, wrote
{written}")
+ pos = 0
+
+ # Write length prefix (little-endian, signed int32 for -1 null marker)
+ t0 = time()
+ struct.pack_into("<i", buf, pos, length)
+ t_packing += time() - t0
+ pos += _STRING_LENGTH_PREFIX_SIZE
+
+ # write the bytes - skip for null values
+ if not is_null:
+ buf[pos : pos + length] = encoded
+ pos += length
+
+ # flush the tail
+ if pos > 0:
+ t0 = time()
+ written = os.write(fd, view[:pos])
+ t_io += time() - t0
+ if written != pos:
+ raise IOError(f"Expected to write {pos} bytes, wrote {written}")
+
+ t_total = time() - t_total_start
+ return (t_total, t_encoding, t_packing, t_io, num_strings)
+
+
+def _get_elem_size_for_type(d_type):
+ """Returns the element size in bytes for a given SystemDS type."""
+ return {
+ "INT32": 4,
+ "INT64": 8,
+ "FP64": 8,
+ "BOOLEAN": 1,
+ "FP32": 4,
+ "UINT8": 1,
+ "CHARACTER": 1,
+ }.get(d_type, 8)
+
+
+def _get_numpy_dtype_for_type(d_type):
+ """Returns the numpy dtype for a given SystemDS type."""
+ dtype_map = {
+ "INT32": np.int32,
+ "INT64": np.int64,
+ "FP64": np.float64,
+ "BOOLEAN": np.dtype("?"),
+ "FP32": np.float32,
+ "UINT8": np.uint8,
+ "CHARACTER": np.char,
+ }
+ return dtype_map.get(d_type, np.float64)
+
+
+def _receive_string_column_from_pipe(
+ sds, pipe, pipe_id, num_rows, batch_size_bytes, col_name
+):
+ """Receives a string column from FrameBlock via pipe."""
+ py_strings, py_total, py_decode, py_io, num_strings, header_received = (
+ _pipe_receive_strings(pipe, num_rows, batch_size_bytes, pipe_id,
sds._log)
+ )
+
+ sds._log.debug(
+ f"""
+ === FROM FrameBlock - Timing Breakdown (Strings) ===
+ Column: {col_name}
+ Total time: {py_total:.3f}s
+ Python side (reading):
+ Total: {py_total:.3f}s
+ Decoding: {py_decode:.3f}s ({100*py_decode/py_total:.1f}%)
+ I/O reads: {py_io:.3f}s ({100*py_io/py_total:.1f}%)
+ Other: {py_total - py_decode - py_io:.3f}s
+ Strings processed: {num_strings:,}
+ """
+ )
+
+ if not header_received:
+ _pipe_receive_header(pipe, pipe_id, sds._log)
+
+ return py_strings
+
+
+def _receive_numeric_column_from_pipe(
+ sds, pipe, pipe_id, d_type, num_rows, batch_size_bytes, col_name
+):
+ """Receives a numeric column from FrameBlock via pipe."""
+ elem_size = _get_elem_size_for_type(d_type)
+ total_bytes = num_rows * elem_size
+ numpy_dtype = _get_numpy_dtype_for_type(d_type)
+
+ sds._log.debug(
+ "FROM FrameBlock - Using single FIFO pipe for transferring {} | {}
bytes | Column: {} | Type: {}".format(
+ format_bytes(total_bytes),
+ total_bytes,
+ col_name,
+ d_type,
+ )
+ )
+
+ if d_type == "BOOLEAN":
+ # Read as uint8 first, then convert to boolean
+ # This ensures proper interpretation of 0/1 bytes
+ arr_uint8 = np.empty(num_rows, dtype=np.uint8)
+ mv = memoryview(arr_uint8).cast("B")
+ _pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes,
sds._log)
+ ret = arr_uint8.astype(bool)
+ else:
+ arr = np.empty(num_rows, dtype=numpy_dtype)
+ mv = memoryview(arr).cast("B")
+ _pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes,
sds._log)
+ ret = arr
+
+ _pipe_receive_header(pipe, pipe_id, sds._log)
+ return ret
+
+
+def _receive_column_py4j(fb, col_array, c_index, d_type, num_rows):
+ """Receives a column from FrameBlock using Py4J (fallback method)."""
+ if d_type == "STRING":
+ ret = []
+ for row in range(num_rows):
+ ent = col_array.getIndexAsBytes(row)
+ if ent:
+ ent = ent.decode()
+ ret.append(ent)
+ else:
+ ret.append(None)
+ elif d_type == "INT32":
+ byteArray = fb.getColumn(c_index).getAsByteArray()
+ ret = np.frombuffer(byteArray, dtype=np.int32)
+ elif d_type == "INT64":
+ byteArray = fb.getColumn(c_index).getAsByteArray()
+ ret = np.frombuffer(byteArray, dtype=np.int64)
+ elif d_type == "FP64":
+ byteArray = fb.getColumn(c_index).getAsByteArray()
+ ret = np.frombuffer(byteArray, dtype=np.float64)
+ elif d_type == "BOOLEAN":
+ # TODO maybe it is more efficient to bit pack the booleans.
+ #
https://stackoverflow.com/questions/5602155/numpy-boolean-array-with-1-bit-entries
+ byteArray = fb.getColumn(c_index).getAsByteArray()
+ ret = np.frombuffer(byteArray, dtype=np.dtype("?"))
+ elif d_type == "CHARACTER":
+ byteArray = fb.getColumn(c_index).getAsByteArray()
+ ret = np.frombuffer(byteArray, dtype=np.char)
+ else:
+ raise NotImplementedError(
+ f"Not Implemented {d_type} for systemds to pandas parsing"
+ )
+ return ret
def frame_block_to_pandas(sds, fb: JavaObject):
@@ -387,45 +893,55 @@ def frame_block_to_pandas(sds, fb: JavaObject):
:param sds: The current systemds context.
:param fb: A pointer to the JVM's FrameBlock object.
"""
-
num_rows = fb.getNumRows()
num_cols = fb.getNumColumns()
df = pd.DataFrame()
+ ep = sds.java_gateway.entry_point
+ jvm = sds.java_gateway.jvm
+
for c_index in range(num_cols):
col_array = fb.getColumn(c_index)
-
d_type = col_array.getValueType().toString()
- if d_type == "STRING":
- ret = []
- for row in range(num_rows):
- ent = col_array.getIndexAsBytes(row)
- if ent:
- ent = ent.decode()
- ret.append(ent)
- else:
- ret.append(None)
- elif d_type == "INT32":
- byteArray = fb.getColumn(c_index).getAsByteArray()
- ret = np.frombuffer(byteArray, dtype=np.int32)
- elif d_type == "INT64":
- byteArray = fb.getColumn(c_index).getAsByteArray()
- ret = np.frombuffer(byteArray, dtype=np.int64)
- elif d_type == "FP64":
- byteArray = fb.getColumn(c_index).getAsByteArray()
- ret = np.frombuffer(byteArray, dtype=np.float64)
- elif d_type == "BOOLEAN":
- # TODO maybe it is more efficient to bit pack the booleans.
- #
https://stackoverflow.com/questions/5602155/numpy-boolean-array-with-1-bit-entries
- byteArray = fb.getColumn(c_index).getAsByteArray()
- ret = np.frombuffer(byteArray, dtype=np.dtype("?"))
- elif d_type == "CHARACTER":
- byteArray = fb.getColumn(c_index).getAsByteArray()
- ret = np.frombuffer(byteArray, dtype=np.char)
- else:
- raise NotImplementedError(
- f"Not Implemented {d_type} for systemds to pandas parsing"
+
+ if sds._data_transfer_mode == 1:
+ # Use pipe transfer for faster data transfer
+ batch_size_bytes = _DEFAULT_BATCH_SIZE_BYTES
+ pipe_id = 0
+ pipe = sds._FIFO_JAVA2PY_PIPES[pipe_id]
+
+ # Java starts writing to pipe in background
+ fut = sds._executor_pool.submit(
+ ep.startWritingColToPipe, pipe_id, fb, c_index
)
+
+ _pipe_receive_header(pipe, pipe_id, sds._log)
+
+ if d_type == "STRING":
+ ret = _receive_string_column_from_pipe(
+ sds,
+ pipe,
+ pipe_id,
+ num_rows,
+ batch_size_bytes,
+ fb.getColumnName(c_index),
+ )
+ else:
+ ret = _receive_numeric_column_from_pipe(
+ sds,
+ pipe,
+ pipe_id,
+ d_type,
+ num_rows,
+ batch_size_bytes,
+ fb.getColumnName(c_index),
+ )
+
+ fut.result()
+ else:
+ # Use Py4J transfer (original method)
+ ret = _receive_column_py4j(fb, col_array, c_index, d_type,
num_rows)
+
df[fb.getColumnName(c_index)] = ret
return df
diff --git a/src/main/python/tests/matrix/test_block_converter_unix_pipe.py
b/src/main/python/tests/matrix/test_block_converter_unix_pipe.py
deleted file mode 100644
index c24a9357c8..0000000000
--- a/src/main/python/tests/matrix/test_block_converter_unix_pipe.py
+++ /dev/null
@@ -1,104 +0,0 @@
-# -------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-# -------------------------------------------------------------
-
-
-import os
-import shutil
-import unittest
-import pandas as pd
-import numpy as np
-from systemds.context import SystemDSContext
-
-
-class TestMatrixBlockConverterUnixPipe(unittest.TestCase):
-
- sds: SystemDSContext = None
- temp_dir: str = "tests/iotests/temp_write_csv/"
-
- @classmethod
- def setUpClass(cls):
- cls.sds = SystemDSContext(
- data_transfer_mode=1, logging_level=50, capture_stdout=True
- )
- if not os.path.exists(cls.temp_dir):
- os.makedirs(cls.temp_dir)
-
- @classmethod
- def tearDownClass(cls):
- cls.sds.close()
- shutil.rmtree(cls.temp_dir, ignore_errors=True)
-
- def test_python_to_java(self):
- combinations = [ # (n_rows, n_cols)
- (5, 0),
- (5, 1),
- (10, 10),
- ]
-
- for n_rows, n_cols in combinations:
- matrix = (
- np.random.random((n_rows, n_cols))
- if n_cols != 0
- else np.random.random(n_rows)
- )
- # Transfer into SystemDS and write to CSV
- matrix_sds = self.sds.from_numpy(matrix)
- matrix_sds.write(
- self.temp_dir + "into_systemds_matrix.csv", format="csv",
header=False
- ).compute()
-
- # Read the CSV file using pandas
- result_df = pd.read_csv(
- self.temp_dir + "into_systemds_matrix.csv", header=None
- )
- matrix_out = result_df.to_numpy()
- if n_cols == 0:
- matrix_out = matrix_out.flatten()
- # Verify the data
- self.assertTrue(np.allclose(matrix_out, matrix))
-
- def test_java_to_python(self):
- combinations = [ # (n_rows, n_cols)
- (5, 1),
- (10, 10),
- ]
-
- for n_rows, n_cols in combinations:
- matrix = np.random.random((n_rows, n_cols))
-
- # Create a CSV file to read into SystemDS
- pd.DataFrame(matrix).to_csv(
- self.temp_dir + "out_of_systemds_matrix.csv", header=False,
index=False
- )
-
- matrix_sds = self.sds.read(
- self.temp_dir + "out_of_systemds_matrix.csv",
- data_type="matrix",
- format="csv",
- )
- matrix_out = matrix_sds.compute()
-
- # Verify the data
- self.assertTrue(np.allclose(matrix_out, matrix))
-
-
-if __name__ == "__main__":
- unittest.main(exit=False)
diff --git a/src/main/python/tests/python_java_data_transfer/__init__.py
b/src/main/python/tests/python_java_data_transfer/__init__.py
new file mode 100644
index 0000000000..e66abb4646
--- /dev/null
+++ b/src/main/python/tests/python_java_data_transfer/__init__.py
@@ -0,0 +1,20 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
diff --git
a/src/main/python/tests/python_java_data_transfer/test_dense_numpy_matrix.py
b/src/main/python/tests/python_java_data_transfer/test_dense_numpy_matrix.py
new file mode 100644
index 0000000000..fcfe683dc7
--- /dev/null
+++ b/src/main/python/tests/python_java_data_transfer/test_dense_numpy_matrix.py
@@ -0,0 +1,246 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+
+import os
+import shutil
+import unittest
+import pandas as pd
+import numpy as np
+from systemds.context import SystemDSContext
+from tests.test_utils import timeout
+
+
+class TestMatrixBlockConverterUnixPipe(unittest.TestCase):
+
+ sds: SystemDSContext = None
+ temp_dir: str = "tests/iotests/temp_write_csv/"
+
+ @classmethod
+ def setUpClass(cls):
+ cls.sds = SystemDSContext(
+ data_transfer_mode=1, logging_level=10, capture_stdout=True
+ )
+ if not os.path.exists(cls.temp_dir):
+ os.makedirs(cls.temp_dir)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.sds.close()
+ shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+ @timeout(60)
+ def test_python_to_java(self):
+ combinations = [ # (n_rows, n_cols)
+ (5, 0),
+ (5, 1),
+ (10, 10),
+ ]
+
+ for n_rows, n_cols in combinations:
+ matrix = (
+ np.random.random((n_rows, n_cols))
+ if n_cols != 0
+ else np.random.random(n_rows)
+ )
+ # Transfer into SystemDS and write to CSV
+ matrix_sds = self.sds.from_numpy(matrix)
+ matrix_sds.write(
+ self.temp_dir + "into_systemds_matrix.csv", format="csv",
header=False
+ ).compute()
+
+ # Read the CSV file using pandas
+ result_df = pd.read_csv(
+ self.temp_dir + "into_systemds_matrix.csv", header=None
+ )
+ matrix_out = result_df.to_numpy()
+ if n_cols == 0:
+ matrix_out = matrix_out.flatten()
+ # Verify the data
+ self.assertTrue(np.allclose(matrix_out, matrix))
+
+ @timeout(60)
+ def test_java_to_python(self):
+ """Test reading matrices from SystemDS back to Python with various
dtypes."""
+ # (dtype, shapes, data_type, tolerance)
+ configs = [
+ (np.float64, [(5, 1), (10, 10), (100, 5)], "random", 1e-9),
+ (np.float32, [(10, 10), (50, 3)], "random", 1e-6),
+ (np.int32, [(10, 10), (20, 5)], "randint", 0.0),
+ (np.uint8, [(10, 10), (15, 8)], "randuint8", 0.0),
+ ]
+
+ def _gen_data(dtype, data_type):
+ if data_type == "random":
+ return lambda s: np.random.random(s).astype(dtype)
+ elif data_type == "randint":
+ return lambda s: np.random.randint(-10000, 10000,
s).astype(dtype)
+ elif data_type == "randuint8":
+ return lambda s: np.random.randint(0, 255, s).astype(dtype)
+
+ test_cases = [
+ {
+ "dtype": dt,
+ "shape": sh,
+ "data": _gen_data(dt, data_type),
+ "tolerance": tol,
+ }
+ for dt, shapes, data_type, tol in configs
+ for sh in shapes
+ ] + [
+ # Edge cases
+ {
+ "dtype": np.float64,
+ "shape": (1, 1),
+ "data": lambda s: np.random.random(s).astype(np.float64),
+ "tolerance": 1e-9,
+ },
+ {
+ "dtype": np.float64,
+ "shape": (1, 10),
+ "data": lambda s: np.random.random(s).astype(np.float64),
+ "tolerance": 1e-9,
+ },
+ {
+ "dtype": np.float64,
+ "shape": (10, 10),
+ "data": lambda s: np.zeros(s, dtype=np.float64),
+ "tolerance": 0.0,
+ },
+ {
+ "dtype": np.float64,
+ "shape": (10, 5),
+ "data": lambda s: np.random.uniform(-100.0, 100.0, s).astype(
+ np.float64
+ ),
+ "tolerance": 1e-9,
+ },
+ ]
+
+ for i, test_case in enumerate(test_cases):
+ with self.subTest(i=i, dtype=test_case["dtype"],
shape=test_case["shape"]):
+ matrix = test_case["data"](test_case["shape"])
+
+ # Create a CSV file to read into SystemDS
+ csv_path = self.temp_dir + f"out_of_systemds_matrix_{i}.csv"
+ pd.DataFrame(matrix).to_csv(csv_path, header=False,
index=False)
+
+ matrix_sds = self.sds.read(
+ csv_path,
+ data_type="matrix",
+ format="csv",
+ )
+ matrix_out = matrix_sds.compute()
+
+ # Verify the data
+ # Note: SystemDS reads all matrices as FP64, so we compare
accordingly
+ if test_case["tolerance"] == 0.0:
+ # Exact match for integer types
+ self.assertTrue(
+ np.array_equal(
+ matrix.astype(np.float64),
matrix_out.astype(np.float64)
+ ),
+ f"Matrix with dtype {test_case['dtype']} and shape
{test_case['shape']} doesn't match exactly",
+ )
+ else:
+ # Approximate match for float types
+ self.assertTrue(
+ np.allclose(
+ matrix.astype(np.float64),
+ matrix_out.astype(np.float64),
+ atol=test_case["tolerance"],
+ ),
+ f"Matrix with dtype {test_case['dtype']} and shape
{test_case['shape']} doesn't match within tolerance",
+ )
+
+ @timeout(60)
+ def test_java_to_python_unsupported_dtypes(self):
+ """Test that unsupported dtypes are handled gracefully or converted."""
+ # Note: SystemDS will convert unsupported dtypes to FP64 when reading
from CSV
+ # So these should still work, just with type conversion
+
+ test_cases = [
+ # INT64 - not directly supported for MatrixBlock, but CSV reads as
FP64
+ {
+ "dtype": np.int64,
+ "shape": (10, 5),
+ "data": lambda s: np.random.randint(-1000000, 1000000,
s).astype(
+ np.int64
+ ),
+ },
+ # Complex types - not supported, should fail or be converted
+ {
+ "dtype": np.complex128,
+ "shape": (5, 5),
+ "data": lambda s: np.random.random(s) + 1j *
np.random.random(s),
+ "should_fail": True, # Complex numbers not supported in
matrices
+ },
+ ]
+
+ for i, test_case in enumerate(test_cases):
+ with self.subTest(i=i, dtype=test_case["dtype"],
shape=test_case["shape"]):
+ if test_case.get("should_fail", False):
+ # Test that unsupported types fail gracefully
+ matrix = test_case["data"](test_case["shape"])
+ csv_path = self.temp_dir + f"unsupported_matrix_{i}.csv"
+
+ # Writing complex numbers to CSV might fail or convert to
real part
+ try:
+ pd.DataFrame(matrix).to_csv(csv_path, header=False,
index=False)
+ # If writing succeeds, reading might fail or behave
unexpectedly
+ with self.assertRaises(Exception):
+ matrix_sds = self.sds.read(
+ csv_path,
+ data_type="matrix",
+ format="csv",
+ )
+ matrix_sds.compute()
+ except Exception:
+ # Writing failed, which is expected
+ pass
+ else:
+ # Type should be converted to FP64
+ matrix = test_case["data"](test_case["shape"])
+ csv_path = self.temp_dir + f"converted_matrix_{i}.csv"
+
+ # Write as the original dtype (pandas will handle
conversion for CSV)
+ pd.DataFrame(matrix).to_csv(csv_path, header=False,
index=False)
+
+ matrix_sds = self.sds.read(
+ csv_path,
+ data_type="matrix",
+ format="csv",
+ )
+ matrix_out = matrix_sds.compute()
+
+ # Should be converted to FP64 and match values
+ self.assertTrue(
+ np.allclose(
+ matrix.astype(np.float64),
+ matrix_out.astype(np.float64),
+ atol=1e-9,
+ ),
+ f"Converted matrix with dtype {test_case['dtype']}
doesn't match",
+ )
+
+
+if __name__ == "__main__":
+ unittest.main(exit=False)
diff --git
a/src/main/python/tests/python_java_data_transfer/test_pandas_frame.py
b/src/main/python/tests/python_java_data_transfer/test_pandas_frame.py
new file mode 100644
index 0000000000..a841795363
--- /dev/null
+++ b/src/main/python/tests/python_java_data_transfer/test_pandas_frame.py
@@ -0,0 +1,265 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+
+import os
+import shutil
+import unittest
+import pandas as pd
+import numpy as np
+from systemds.context import SystemDSContext
+from tests.test_utils import timeout
+
+
+class TestFrameConverterUnixPipe(unittest.TestCase):
+
+ sds: SystemDSContext = None
+ temp_dir: str = "tests/iotests/temp_write_csv/"
+
+ @classmethod
+ def setUpClass(cls):
+ cls.sds = SystemDSContext(
+ data_transfer_mode=1, logging_level=10, capture_stdout=True
+ )
+ if not os.path.exists(cls.temp_dir):
+ os.makedirs(cls.temp_dir)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.sds.close()
+ shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+ @timeout(60)
+ def test_frame_python_to_java(self):
+ """Test converting pandas DataFrame to SystemDS FrameBlock and writing
to CSV."""
+ combinations = [
+ # Float32 column
+ {"float32_col": np.random.random(50).astype(np.float32)},
+ # Float64 column
+ {"float64_col": np.random.random(50).astype(np.float64)},
+ # Int32 column
+ {"int32_col": np.random.randint(-1000, 1000, 50).astype(np.int32)},
+ # Int64 column
+ {"int64_col": np.random.randint(-1000000, 1000000,
50).astype(np.int64)},
+ # Uint8 column
+ {"uint8_col": np.random.randint(0, 255, 50).astype(np.uint8)},
+ # All numeric types together
+ {
+ "float32_col": np.random.random(30).astype(np.float32),
+ "float64_col": np.random.random(30).astype(np.float64),
+ "int32_col": np.random.randint(-1000, 1000,
30).astype(np.int32),
+ "int64_col": np.random.randint(-1000000, 1000000,
30).astype(np.int64),
+ "uint8_col": np.random.randint(0, 255, 30).astype(np.uint8),
+ },
+ # Mixed numeric types with strings
+ {
+ "float32_col": np.random.random(25).astype(np.float32),
+ "float64_col": np.random.random(25).astype(np.float64),
+ "int32_col": np.random.randint(-1000, 1000,
25).astype(np.int32),
+ "int64_col": np.random.randint(-1000000, 1000000,
25).astype(np.int64),
+ "uint8_col": np.random.randint(0, 255, 25).astype(np.uint8),
+ "string_col": [f"string_{i}" for i in range(25)],
+ },
+ ]
+
+ for frame_dict in combinations:
+ frame = pd.DataFrame(frame_dict)
+ # Transfer into SystemDS and write to CSV
+ frame_sds = self.sds.from_pandas(frame)
+ frame_sds.write(
+ self.temp_dir + "into_systemds_frame.csv", format="csv",
header=False
+ ).compute()
+
+ # Read the CSV file using pandas
+ result_df = pd.read_csv(
+ self.temp_dir + "into_systemds_frame.csv", header=None
+ )
+
+ # For numeric columns, verify with allclose for floats, exact
match for integers
+ # For string columns, verify exact match
+ for col_idx, col_name in enumerate(frame.columns):
+ original_col = frame[col_name]
+ result_col = result_df.iloc[:, col_idx]
+
+ if pd.api.types.is_numeric_dtype(original_col):
+ original_dtype = original_col.dtype
+ # For integer types (int32, int64, uint8), use exact
equality
+ if original_dtype in [np.int32, np.int64, np.uint8]:
+ self.assertTrue(
+ np.array_equal(
+ original_col.values.astype(original_dtype),
+ result_col.values.astype(original_dtype),
+ ),
+ f"Column {col_name} (dtype: {original_dtype})
integer values don't match exactly",
+ )
+ else:
+ # For float types (float32, float64), use allclose
+ self.assertTrue(
+ np.allclose(
+ original_col.values.astype(float),
+ result_col.values.astype(float),
+ equal_nan=True,
+ ),
+ f"Column {col_name} (dtype: {original_dtype})
float values don't match",
+ )
+ else:
+ # For string columns, compare as strings
+ self.assertTrue(
+ (
+ original_col.astype(str).values
+ == result_col.astype(str).values
+ ).all(),
+ f"Column {col_name} string values don't match",
+ )
+
+ @timeout(60)
+ def test_frame_java_to_python(self):
+ """Test reading CSV into SystemDS FrameBlock and converting back to
pandas DataFrame."""
+ combinations = [
+ {"float32_col": np.random.random(50).astype(np.float32)},
+ {"float64_col": np.random.random(50).astype(np.float64)},
+ {"int32_col": np.random.randint(-1000, 1000, 50).astype(np.int32)},
+ {"int64_col": np.random.randint(-1000000, 1000000,
50).astype(np.int64)},
+ {"uint8_col": np.random.randint(0, 255, 50).astype(np.uint8)},
+ # String column only
+ {"text_col": [f"text_value_{i}" for i in range(30)]},
+ # All numeric types together
+ {
+ "float32_col": np.random.random(30).astype(np.float32),
+ "float64_col": np.random.random(30).astype(np.float64),
+ "int32_col": np.random.randint(-1000, 1000,
30).astype(np.int32),
+ "int64_col": np.random.randint(-1000000, 1000000,
30).astype(np.int64),
+ "uint8_col": np.random.randint(0, 255, 30).astype(np.uint8),
+ },
+ # Mixed numeric types with strings
+ {
+ "float32_col": np.random.random(25).astype(np.float32),
+ "float64_col": np.random.random(25).astype(np.float64),
+ "int32_col": np.random.randint(-1000, 1000,
25).astype(np.int32),
+ "int64_col": np.random.randint(-1000000, 1000000,
25).astype(np.int64),
+ "uint8_col": np.random.randint(0, 255, 25).astype(np.uint8),
+ "string_col": [f"string_{i}" for i in range(25)],
+ },
+ ]
+ print("Running frame conversion test\n\n!!!!")
+ for frame_dict in combinations:
+ frame = pd.DataFrame(frame_dict)
+ # Create a CSV file to read into SystemDS
+ frame_sds = self.sds.from_pandas(frame)
+ frame_sds = frame_sds.rbind(frame_sds)
+ frame_out = frame_sds.compute()
+
+ frame = pd.concat([frame, frame], ignore_index=True)
+
+ # Verify it's a DataFrame
+ self.assertIsInstance(frame_out, pd.DataFrame)
+
+ # Verify shape matches
+ self.assertEqual(frame.shape, frame_out.shape, "Frame shapes don't
match")
+
+ # Verify column data
+ for col_name in frame.columns:
+ original_col = frame[col_name]
+ # FrameBlock to pandas may not preserve column names, so
compare by position
+ col_idx = list(frame.columns).index(col_name)
+ result_col = frame_out.iloc[:, col_idx]
+
+ if pd.api.types.is_numeric_dtype(original_col):
+ original_dtype = original_col.dtype
+ # For integer types (int32, int64, uint8), use exact
equality
+ if original_dtype in [np.int32, np.int64, np.uint8]:
+ self.assertTrue(
+ np.array_equal(
+ original_col.values.astype(original_dtype),
+ result_col.values.astype(original_dtype),
+ ),
+ f"Column {col_name} (dtype: {original_dtype})
integer values don't match exactly",
+ )
+ else:
+ # For float types (float32, float64), use allclose
+ # print difference in case of failure
+ if not np.allclose(
+ original_col.values.astype(float),
+ result_col.values.astype(float),
+ equal_nan=True,
+ atol=1e-6,
+ ):
+ print(
+ f"Column {col_name} (dtype: {original_dtype})
float values don't match: {np.abs(original_col.values.astype(float) -
result_col.values.astype(float))}"
+ )
+ self.assertTrue(
+ False,
+ f"Column {col_name} (dtype: {original_dtype})
float values don't match",
+ )
+
+ else:
+ # For string columns, compare as strings
+ original_str = original_col.astype(str).values
+ result_str = result_col.astype(str).values
+ self.assertTrue(
+ (original_str == result_str).all(),
+ f"Column {col_name} string values don't match",
+ )
+
+ @timeout(60)
+ def test_frame_string_with_nulls(self):
+ """Test converting pandas DataFrame with null string values."""
+ # Create a simple DataFrame with 5 string values, 2 of them None
+ df = pd.DataFrame({"string_col": ["hello", None, "world", None,
"test"]})
+
+ # Transfer into SystemDS and back
+ frame_sds = self.sds.from_pandas(df)
+ frame_sds = frame_sds.rbind(frame_sds)
+ frame_out = frame_sds.compute()
+ df = pd.concat([df, df], ignore_index=True)
+
+ # Verify it's a DataFrame
+ self.assertIsInstance(frame_out, pd.DataFrame)
+
+ # Verify shape matches
+ self.assertEqual(df.shape, frame_out.shape, "Frame shapes don't match")
+
+ # Verify column data - check that None values are preserved
+ original_col = df["string_col"]
+ result_col = frame_out.iloc[:, 0]
+
+ # Check each value
+ for i in range(len(original_col)):
+ original_val = original_col.iloc[i]
+ result_val = result_col.iloc[i]
+
+ if pd.isna(original_val):
+ # Original is null, result should also be null
+ self.assertTrue(
+ pd.isna(result_val),
+ f"Row {i}: Expected null but got '{result_val}'",
+ )
+ else:
+ # Original is not null, result should match
+ self.assertEqual(
+ str(original_val),
+ str(result_val),
+ f"Row {i}: Expected '{original_val}' but got
'{result_val}'",
+ )
+
+
+if __name__ == "__main__":
+ unittest.main(exit=False)
diff --git a/src/main/python/tests/test_utils.py
b/src/main/python/tests/test_utils.py
new file mode 100644
index 0000000000..b4aec71d4b
--- /dev/null
+++ b/src/main/python/tests/test_utils.py
@@ -0,0 +1,58 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import functools
+import threading
+
+
+def timeout(seconds):
+ """Decorator to add timeout to test methods."""
+
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ result = [None]
+ exception = [None]
+
+ def target():
+ try:
+ result[0] = func(*args, **kwargs)
+ except Exception as e:
+ exception[0] = e
+
+ thread = threading.Thread(target=target)
+ thread.daemon = True
+ thread.start()
+ thread.join(seconds)
+
+ if thread.is_alive():
+ raise TimeoutError(
+ f"Test {func.__name__} exceeded timeout of {seconds}
seconds"
+ )
+
+ if exception[0]:
+ raise exception[0]
+
+ return result[0]
+
+ return wrapper
+
+ return decorator
diff --git
a/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java
b/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java
index 650d6c1053..424d513a99 100644
--- a/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java
+++ b/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java
@@ -20,6 +20,8 @@
package org.apache.sysds.test.component.utils;
import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.UnixPipeUtils;
import org.junit.Rule;
@@ -44,6 +46,8 @@ import java.util.Collection;
import static org.junit.Assert.assertArrayEquals;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
@RunWith(Enclosed.class)
public class UnixPipeUtilsTest {
@@ -59,6 +63,7 @@ public class UnixPipeUtilsTest {
{Types.ValueType.FP64, 6, 48, 99, new
MatrixBlock(2, 3, new double[]{1.0, 2.0, 3.0, 4.0, 5.0, 6.0})},
{Types.ValueType.FP32, 6, 24, 88, new
MatrixBlock(3, 2, new double[]{1.0, 2.0, 3.0, 4.0, 5.0, 6.0})},
{Types.ValueType.INT32, 4, 16, 77, new
MatrixBlock(2, 2, new double[]{0, -1, 2, -3})},
+ {Types.ValueType.INT64, 4, 64, 55, new
MatrixBlock(2, 2, new double[]{0, 1, 2, 3})},
{Types.ValueType.UINT8, 4, 4, 66, new
MatrixBlock(2, 2, new double[]{0, 1, 2, 3})}
});
}
@@ -81,6 +86,7 @@ public class UnixPipeUtilsTest {
@Test
public void testReadWriteNumpyArrayBatch() throws IOException {
File tempFile = folder.newFile("pipe_test_" +
type.name());
+ matrixBlock.recomputeNonZeros();
try (BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {
UnixPipeUtils.writeNumpyArrayInBatches(out, id,
batchSize, numElem, type, matrixBlock);
@@ -88,35 +94,105 @@ public class UnixPipeUtilsTest {
double[] output = new double[numElem];
try (BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
- UnixPipeUtils.readNumpyArrayInBatches(in, id,
batchSize, numElem, type, output, 0);
+ long nonZeros =
UnixPipeUtils.readNumpyArrayInBatches(in, id, batchSize, numElem, type, output,
0);
+ // Verify nonzero count matches MatrixBlock
+
org.junit.Assert.assertEquals(matrixBlock.getNonZeros(), nonZeros);
}
assertArrayEquals(matrixBlock.getDenseBlockValues(),
output, 1e-9);
}
}
+ @RunWith(Parameterized.class)
+ public static class FrameColumnParameterizedTest {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Parameterized.Parameters(name = "{index}: frameType={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {Types.ValueType.FP64, new Object[]{1.0, -2.5,
3.25, 4.75}, 64, 201},
+ {Types.ValueType.FP32, new Object[]{1.0f,
-2.25f, 3.5f, -4.125f}, 48, 202},
+ {Types.ValueType.INT32, new Object[]{0, -1, 5,
42}, 32, 203},
+ {Types.ValueType.BOOLEAN, new Object[]{true,
false, true, false}, 8, 205},
+ {Types.ValueType.STRING, new Object[]{"alpha",
"beta", "gamma",null, "delta"}, 64, 205},
+ {Types.ValueType.STRING, new
Object[]{"alphaalphaalphaalphaalpha", "beta", "gamma",null, "delta"}, 16, 205},
+ {Types.ValueType.FP64, new Object[]{1.0, -2.5,
3.25, 4.75}, 8, 201},
+ });
+ }
+
+ private final Types.ValueType type;
+ private final Object[] values;
+ private final int batchSize;
+ private final int id;
+
+ public FrameColumnParameterizedTest(Types.ValueType type,
Object[] values, int batchSize, int id) {
+ this.type = type;
+ this.values = values;
+ this.batchSize = batchSize;
+ this.id = id;
+ }
+
+ @Test
+ public void testReadWriteFrameColumn() throws IOException {
+ File tempFile = folder.newFile("frame_pipe_" +
type.name());
+ Array<?> column = createColumn(type, values);
+
+ long bytesWritten;
+ try(BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {
+ bytesWritten =
UnixPipeUtils.writeFrameColumnToPipe(out, id, batchSize, column, type);
+ }
+
+ int totalBytes = Math.toIntExact(bytesWritten);
+ try(BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
+ Array<?> read =
UnixPipeUtils.readFrameColumnFromPipe(in, id, values.length, totalBytes,
batchSize, type);
+ assertFrameColumnEquals(column, read, type);
+ }
+ }
+
+ private static Array<?> createColumn(Types.ValueType type,
Object[] values) {
+ Array<?> array = ArrayFactory.allocate(type,
values.length);
+ for(int i = 0; i < values.length; i++) {
+ switch(type) {
+ case STRING -> array.set(i, (String)
values[i]);
+ case BOOLEAN -> array.set(i, ((Boolean)
values[i]) ? 1.0 : 0.0);
+ default -> array.set(i, ((Number)
values[i]).doubleValue());
+ }
+ }
+ return array;
+ }
+
+ private static void assertFrameColumnEquals(Array<?> expected,
Array<?> actual, Types.ValueType type) {
+ org.junit.Assert.assertEquals(expected.size(),
actual.size());
+ for(int i = 0; i < expected.size(); i++) {
+ switch(type) {
+ case FP64 ->
org.junit.Assert.assertEquals(
+ ((Number)
expected.get(i)).doubleValue(),
+ ((Number)
actual.get(i)).doubleValue(), 1e-9);
+ case FP32 ->
org.junit.Assert.assertEquals(
+ ((Number)
expected.get(i)).floatValue(),
+ ((Number)
actual.get(i)).floatValue(), 1e-6f);
+ case STRING ->
org.junit.Assert.assertEquals(expected.get(i), actual.get(i));
+ default ->
org.junit.Assert.assertEquals(expected.get(i), actual.get(i));
+ }
+ }
+ }
+ }
+
public static class NonParameterizedTest {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Test(expected = FileNotFoundException.class)
public void testOpenInputFileNotFound() throws IOException {
- // instantiate class once for coverage
new UnixPipeUtils();
-
- // Create a path that does not exist
File nonExistentFile = new File(folder.getRoot(),
"nonexistent.pipe");
-
- // This should throw FileNotFoundException
UnixPipeUtils.openInput(nonExistentFile.getAbsolutePath(), 123);
}
@Test(expected = FileNotFoundException.class)
public void testOpenOutputFileNotFound() throws IOException {
- // Create a path that does not exist
File nonExistentFile = new File(folder.getRoot(),
"nonexistent.pipe");
-
- // This should throw FileNotFoundException
UnixPipeUtils.openOutput(nonExistentFile.getAbsolutePath(), 123);
}
@@ -125,14 +201,9 @@ public class UnixPipeUtilsTest {
public void testOpenInputAndOutputHandshakeMatch() throws
IOException {
File tempFile = folder.newFile("pipe_test1");
int id = 42;
-
- // Write expected handshake
+
try (BufferedOutputStream bos =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {}
-
- // Read and validate handshake
- try (BufferedInputStream bis =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
- // success: no exception = handshake passed
- }
+ try (BufferedInputStream bis =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {}
}
@Test(expected = IllegalStateException.class)
@@ -142,8 +213,6 @@ public class UnixPipeUtilsTest {
int wrongReadId = 456;
try (BufferedOutputStream bos =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), writeId)) {}
-
- // Will throw due to ID mismatch
UnixPipeUtils.openInput(tempFile.getAbsolutePath(),
wrongReadId);
}
@@ -159,6 +228,18 @@ public class UnixPipeUtilsTest {
UnixPipeUtils.openInput(tempFile.getAbsolutePath(),
100);
}
+ @Test(expected = IOException.class)
+ public void testReadColumnFromPipeError() throws IOException {
+ File tempFile = folder.newFile("pipe_test3");
+ int id = 42;
+
+ BufferedOutputStream bos =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id);
+ BufferedInputStream bis =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id);
+ Array<?> column =
ArrayFactory.allocate(Types.ValueType.INT64, 4);
+ UnixPipeUtils.writeFrameColumnToPipe(bos, id, 16,
column, Types.ValueType.INT64);
+ UnixPipeUtils.readFrameColumnFromPipe(bis, 42, 4, 12,
32 * 1024, Types.ValueType.INT32);
+ }
+
@Test(expected = EOFException.class)
public void testReadNumpyArrayUnexpectedEOF() throws
IOException {
File tempFile = folder.newFile("pipe_test5");
@@ -187,5 +268,201 @@ public class UnixPipeUtilsTest {
UnixPipeUtils.readNumpyArrayInBatches(in, id,
batchSize, numElem, type, outArr, 0);
}
}
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetElementSizeUnsupportedType() {
+ UnixPipeUtils.getElementSize(Types.ValueType.STRING);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testReadNumpyArrayUnsupportedType() throws
IOException {
+ File file = folder.newFile("unsupported_type.pipe");
+ int id = 7;
+ try (BufferedOutputStream out =
UnixPipeUtils.openOutput(file.getAbsolutePath(), id)) {
+ UnixPipeUtils.writeHandshake(id, out); // start
handshake
+ out.flush();
+ UnixPipeUtils.writeHandshake(id, out); // end
handshake, no payload
+ }
+ double[] outArr = new double[0];
+ try (BufferedInputStream in =
UnixPipeUtils.openInput(file.getAbsolutePath(), id)) {
+ UnixPipeUtils.readNumpyArrayInBatches(in, id,
32, 0, Types.ValueType.STRING, outArr, 0);
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testWriteNumpyArrayInBatchesError() throws
IOException {
+ UnixPipeUtils.writeNumpyArrayInBatches(null, 0, 0, 0,
Types.ValueType.INT32, null);
+ }
+
+ @Test
+ public void testGetBufferReaderUnsupportedType() throws
Exception {
+ Method m =
UnixPipeUtils.class.getDeclaredMethod("getBufferReader", Types.ValueType.class);
+ m.setAccessible(true);
+
+ try {
+ m.invoke(null, Types.ValueType.STRING);
+ org.junit.Assert.fail("Expected
UnsupportedOperationException");
+ } catch (InvocationTargetException e) {
+ org.junit.Assert.assertTrue(e.getCause()
instanceof UnsupportedOperationException);
+ }
+ }
+
+ @Test
+ public void testGetBufferWriterUnsupportedType() throws
Exception {
+ Method m =
UnixPipeUtils.class.getDeclaredMethod("getBufferWriter", Types.ValueType.class);
+ m.setAccessible(true);
+
+ try {
+ m.invoke(null, Types.ValueType.STRING);
+ org.junit.Assert.fail("Expected
UnsupportedOperationException");
+ } catch (InvocationTargetException e) {
+ org.junit.Assert.assertTrue(e.getCause()
instanceof UnsupportedOperationException);
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testReadWriteFrameColumnUINT8() throws IOException {
+ File tempFile = folder.newFile("frame_pipe_UINT8");
+ int id = 204;
+
+ BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id);
+ UnixPipeUtils.writeHandshake(id, out);
+ out.write(new byte[]{(byte) 0x00, (byte) 0x01, (byte)
0x02, (byte) 0x03});
+ UnixPipeUtils.writeHandshake(id, out);
+ out.flush();
+
+ Array<?> read = null;
+ try(BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
+ read =
UnixPipeUtils.readFrameColumnFromPipe(in, id, 4, 4, 32 * 1024,
Types.ValueType.UINT8);
+ for(int i = 0; i < 4; i++) {
+ org.junit.Assert.assertEquals(i,
read.get(i));
+ }
+ }
+ try(BufferedOutputStream out2 =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {
+ UnixPipeUtils.writeFrameColumnToPipe(out2, id,
16, read, Types.ValueType.UINT8);
+ }
+ }
+
+ @Test
+ public void testReadWriteFrameColumnINT64() throws IOException {
+ File tempFile = folder.newFile("frame_pipe_INT32");
+ int id = 204;
+
+ BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id);
+ UnixPipeUtils.writeHandshake(id, out);
+ // write 4 int64 values
+ ByteBuffer bb = ByteBuffer.allocate(8 *
4).order(ByteOrder.LITTLE_ENDIAN);
+ for(int i = 0; i < 4; i++) {
+ bb.putLong(i);
+ }
+ out.write(bb.array());
+ UnixPipeUtils.writeHandshake(id, out);
+ out.flush();
+
+ Array<?> read = null;
+ try(BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
+ read =
UnixPipeUtils.readFrameColumnFromPipe(in, id, 4, 32, 32 * 1024,
Types.ValueType.INT64);
+ for(int i = 0; i < 4; i++) {
+ org.junit.Assert.assertEquals(i,
((Number) read.get(i)).longValue());
+ }
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testReadWriteFrameColumnUnsupportedType() throws
IOException {
+ File tempFile = folder.newFile("frame_pipe_HASH64");
+ int id = 204;
+
+ BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id);
+ UnixPipeUtils.writeHandshake(id, out);
+ out.flush();
+
+ try(BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
+ UnixPipeUtils.readFrameColumnFromPipe(in, id,
4, 32, 4, Types.ValueType.HASH64);
+ }
+ }
+
+ @Test
+ public void testReadWriteFrameColumnLongString1() throws
IOException {
+ File tempFile =
folder.newFile("frame_pipe_long_string");
+ Array<?> column =
FrameColumnParameterizedTest.createColumn(Types.ValueType.STRING,
+ new Object[]{"alphaalphaalphaalphaalphaa",
"beta", "gamma",null, "delta"});
+ int id = 205;
+ int batchSize = 16;
+
+ long bytesWritten;
+ try(BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {
+ bytesWritten =
UnixPipeUtils.writeFrameColumnToPipe(out, id, batchSize, column,
Types.ValueType.STRING);
+ }
+
+ int totalBytes = Math.toIntExact(bytesWritten);
+ try(BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
+ Array<?> read =
UnixPipeUtils.readFrameColumnFromPipe(in, id, column.size(), totalBytes,
batchSize, Types.ValueType.STRING);
+
FrameColumnParameterizedTest.assertFrameColumnEquals(column, read,
Types.ValueType.STRING);
+ }
+ }
+
+ @Test
+ public void testReadWriteFrameColumnLongString2() throws
IOException {
+ File tempFile =
folder.newFile("frame_pipe_long_string");
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < 35*1024; i++) {
+ sb.append("a");
+ }
+ Array<?> column =
FrameColumnParameterizedTest.createColumn(Types.ValueType.STRING,
+ new Object[]{sb.toString()});
+ int id = 205;
+ int batchSize = 16*1024;
+
+ long bytesWritten;
+ try(BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {
+ bytesWritten =
UnixPipeUtils.writeFrameColumnToPipe(out, id, batchSize, column,
Types.ValueType.STRING);
+ }
+
+ int totalBytes = Math.toIntExact(bytesWritten);
+ try(BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
+ Array<?> read =
UnixPipeUtils.readFrameColumnFromPipe(in, id, column.size(), totalBytes,
batchSize, Types.ValueType.STRING);
+
FrameColumnParameterizedTest.assertFrameColumnEquals(column, read,
Types.ValueType.STRING);
+ }
+ }
+
+ @Test
+ public void testReadWriteFrameColumnString() throws IOException
{
+ File tempFile =
folder.newFile("frame_pipe_long_string");
+ Array<?> column =
FrameColumnParameterizedTest.createColumn(Types.ValueType.STRING,
+ new Object[]{"alphabet"});
+ int id = 205;
+ int batchSize = 12;
+
+ long bytesWritten;
+ try(BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {
+ bytesWritten =
UnixPipeUtils.writeFrameColumnToPipe(out, id, batchSize, column,
Types.ValueType.STRING);
+ }
+
+ int totalBytes = Math.toIntExact(bytesWritten);
+ try(BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
+ Array<?> read =
UnixPipeUtils.readFrameColumnFromPipe(in, id, column.size(), totalBytes,
batchSize, Types.ValueType.STRING);
+
FrameColumnParameterizedTest.assertFrameColumnEquals(column, read,
Types.ValueType.STRING);
+ }
+ }
+
+ @Test
+ public void testWriteFrameColumnINT32() throws IOException {
+ File tempFile = folder.newFile("frame_pip2_INT32");
+ int id = 204;
+ Array<?> column =
FrameColumnParameterizedTest.createColumn(Types.ValueType.INT32,
+ new Object[]{0, 1, 2, 3});
+
+ try(BufferedOutputStream out =
UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {
+ UnixPipeUtils.writeFrameColumnToPipe(out, id,
4, column, Types.ValueType.INT32);
+ }
+
+ Array<?> read = null;
+ try(BufferedInputStream in =
UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {
+ read =
UnixPipeUtils.readFrameColumnFromPipe(in, id, 4, 16, 4, Types.ValueType.INT32);
+
FrameColumnParameterizedTest.assertFrameColumnEquals(column, read,
Types.ValueType.INT32);
+ }
+
+ }
}
}
diff --git
a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
index f08dd63c65..4365dd629f 100644
--- a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
+++ b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java
@@ -1,18 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
+ * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@@ -24,7 +24,10 @@ import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.sysds.api.PythonDMLScript;
import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.frame.data.columns.Array;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.UnixPipeUtils;
import org.apache.sysds.test.LoggingUtils;
@@ -39,7 +42,6 @@ import py4j.GatewayServer;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
-import java.security.Permission;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
@@ -48,29 +50,19 @@ import static org.junit.Assert.assertArrayEquals;
/** Simple tests to verify startup of Python Gateway server happens without
crashes */
public class StartupTest {
private LoggingUtils.TestAppender appender;
- @SuppressWarnings("removal")
- private SecurityManager sm;
@Before
- @SuppressWarnings("removal")
public void setUp() {
appender = LoggingUtils.overwrite();
- sm = System.getSecurityManager();
- System.setSecurityManager(new NoExitSecurityManager());
+ PythonDMLScript.setExitHandler(new ExitCalled());
PythonDMLScript.setDMLGateWayListenerLoggerLevel(Level.ALL);
Logger.getLogger(PythonDMLScript.class.getName()).setLevel(Level.ALL);
}
@After
- @SuppressWarnings("removal")
public void tearDown() {
LoggingUtils.reinsert(appender);
- System.setSecurityManager(sm);
- }
-
- @SuppressWarnings("unused")
- private void assertLogMessages(String... expectedMessages) {
- assertLogMessages(true, expectedMessages);
+ PythonDMLScript.resetExitHandler();
}
private void assertLogMessages(boolean strict, String...
expectedMessages) {
@@ -92,9 +84,9 @@ public class StartupTest {
// order does not matter
boolean found = false;
- for (LoggingEvent loggingEvent : log) {
- found |=
loggingEvent.getMessage().toString().startsWith(message);
- }
+ for (LoggingEvent loggingEvent : log) {
+ found |=
loggingEvent.getMessage().toString().startsWith(message);
+ }
Assert.assertTrue("Expected log message not
found: " + message,found);
}
}
@@ -137,7 +129,7 @@ public class StartupTest {
Thread.sleep(200);
PythonDMLScript.main(new String[]{"-python", "4001"});
Thread.sleep(200);
- } catch (SecurityException e) {
+ } catch (ExitCalled e) {
assertLogMessages(false,
"GatewayServer started",
"failed startup"
@@ -185,8 +177,9 @@ public class StartupTest {
MatrixBlock mb = new MatrixBlock(2, 3, data);
script.startWritingMbToPipe(0, mb);
double[] rcv_data = new double[data.length];
- UnixPipeUtils.readNumpyArrayInBatches(java2py, 0, 32,
data.length, Types.ValueType.FP64, rcv_data, 0);
+ long nonZeros = UnixPipeUtils.readNumpyArrayInBatches(java2py,
0, 32, data.length, Types.ValueType.FP64, rcv_data, 0);
assertArrayEquals(data, rcv_data, 1e-9);
+ Assert.assertEquals((long) data.length, nonZeros); // All
values are non-zero
// Read Test
UnixPipeUtils.writeNumpyArrayInBatches(py2java, 0, 32,
data.length, Types.ValueType.FP64, mb);
@@ -230,6 +223,46 @@ public class StartupTest {
PythonDMLScript.GwS.shutdown();
Thread.sleep(200);
}
+
+
+ @Test
+ public void testDataFrameTransfer() throws Exception {
+ PythonDMLScript.main(new String[]{"-python", "4003"});
+ Thread.sleep(200);
+ PythonDMLScript script = (PythonDMLScript)
PythonDMLScript.GwS.getGateway().getEntryPoint();
+
+ File in = folder.newFile("py2java-0");
+ File out = folder.newFile("java2py-0");
+
+ // Init Test
+ BufferedOutputStream py2java =
UnixPipeUtils.openOutput(in.getAbsolutePath(), 0);
+ script.openPipes(folder.getRoot().getPath(), 1);
+ BufferedInputStream java2py =
UnixPipeUtils.openInput(out.getAbsolutePath(), 0);
+
+ // Write Test
+ String[][] data = new String[][]{{"1", "2", "3"}, {"4", "5",
"6"}};
+ ValueType[] schema = new ValueType[]{Types.ValueType.STRING,
Types.ValueType.STRING, Types.ValueType.STRING};
+ FrameBlock fb = new FrameBlock(schema, data);
+
+ FrameBlock rcv_fb = new FrameBlock(schema, 2);
+
+ for (int i = 0; i < 3; i++) {
+ script.startWritingColToPipe(0, fb, i);
+ Array<?> rcv_arr =
UnixPipeUtils.readFrameColumnFromPipe(java2py, 0, 2, -1, 32 * 1024,
Types.ValueType.STRING);
+ rcv_fb.setColumn(i, rcv_arr);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ UnixPipeUtils.writeFrameColumnToPipe(py2java, 0, 32,
fb.getColumn(i), Types.ValueType.STRING);
+ script.startReadingColFromPipe(0, rcv_fb, 2, -1, i,
Types.ValueType.STRING, false);
+ }
+
+ script.closePipes();
+
+ PythonDMLScript.GwS.shutdown();
+ Thread.sleep(200);
+ }
+
@Test(expected = DMLRuntimeException.class)
public void testDataTransferNotInit1() throws Exception {
@@ -255,14 +288,25 @@ public class StartupTest {
script.startReadingMbFromPipes(new int[]{3,3}, 2, 3,
Types.ValueType.FP64);
}
- @SuppressWarnings("removal")
- class NoExitSecurityManager extends SecurityManager {
- @Override
- public void checkPermission(Permission perm) { }
+ @Test(expected = Exception.class)
+ public void testDataTransferNotInit4() throws Exception {
+ PythonDMLScript.main(new String[]{"-python", "4007"});
+ Thread.sleep(200);
+ PythonDMLScript script = (PythonDMLScript)
PythonDMLScript.GwS.getGateway().getEntryPoint();
+ script.startReadingColFromPipe(0, null, 2, -1, 0,
Types.ValueType.STRING, false);
+ }
+ @Test(expected = Exception.class)
+ public void testDataTransferNotInit5() throws Exception {
+ PythonDMLScript.main(new String[]{"-python", "4007"});
+ Thread.sleep(200);
+ PythonDMLScript script = (PythonDMLScript)
PythonDMLScript.GwS.getGateway().getEntryPoint();
+ script.startWritingColToPipe(0, null, 0);
+ }
+ private static class ExitCalled extends RuntimeException implements
PythonDMLScript.ExitHandler {
@Override
- public void checkExit(int status) {
- throw new SecurityException("Intercepted exit()");
+ public void exit(int status) {
+ throw this;
}
}