This is an automated email from the ASF dual-hosted git repository.
janardhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 8e9aaa5e3a [SYSTEMDS-3911] OOC Transpose operation (#2316)
8e9aaa5e3a is described below
commit 8e9aaa5e3af50a6c51a20a6709c7a24ae7475956
Author: Janardhan Pulivarthi <[email protected]>
AuthorDate: Thu Aug 28 08:12:59 2025 +0530
[SYSTEMDS-3911] OOC Transpose operation (#2316)
This patch introduces the TransposeOOCInstruction, a fundamental component
of the out-of-core (OOC) backend. It performs a full transpose on a matrix
stream, enabling the composition of complex OOC pipelines required for
algorithms like lmDS.
Implementation detail:
* Asynchronous Producer: The processInstruction method launches a
background thread to perform the transpose operation but returns control to the
main thread immediately. This allows the compiler to continue building the
execution plan without blocking. The actual computation is triggered when a
downstream consumer "pulls" data from the output stream.
* Streaming Logic: The background thread consumes a stream of
IndexedMatrixValue blocks from its input. For each block, it:
- Performs an in-memory transpose using the standard reorgOperations
with a ReorgOperator.
- Crucially, it also transposes the MatrixIndexes of the block (e.g., a
block at (row=i, col=j) becomes a block at (row=j, col=i)).
- Enqueues the new, transposed IndexedMatrixValue into the output
stream.
* Integration: The new instruction is fully integrated into the system:
- A Reorg type has been added to the OOCType enum.
- The OOCInstructionParser has been updated to recognize the r' opcode
for OOC execution and route it to the TransposeOOCInstruction.
- The new Reorg OOC type is now registered with the
OOCInstructionParser to correctly route the r' opcode to this new instruction.
---
.../runtime/instructions/OOCInstructionParser.java | 3 +
.../runtime/instructions/ooc/OOCInstruction.java | 2 +-
.../instructions/ooc/TransposeOOCInstruction.java | 90 ++++++++++++++
.../sysds/test/functions/ooc/TransposeTest.java | 133 +++++++++++++++++++++
src/test/scripts/functions/ooc/Transpose.dml | 27 +++++
5 files changed, 254 insertions(+), 1 deletion(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
index 9b1165b819..73b5ca0261 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
@@ -29,6 +29,7 @@ import
org.apache.sysds.runtime.instructions.ooc.OOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.UnaryOOCInstruction;
import
org.apache.sysds.runtime.instructions.ooc.MatrixVectorBinaryOOCInstruction;
+import org.apache.sysds.runtime.instructions.ooc.TransposeOOCInstruction;
public class OOCInstructionParser extends InstructionParser {
protected static final Log LOG =
LogFactory.getLog(OOCInstructionParser.class.getName());
@@ -60,6 +61,8 @@ public class OOCInstructionParser extends InstructionParser {
case AggregateBinary:
case MAPMM:
return
MatrixVectorBinaryOOCInstruction.parseInstruction(str);
+ case Reorg:
+ return
TransposeOOCInstruction.parseInstruction(str);
default:
throw new DMLRuntimeException("Invalid OOC
Instruction Type: " + ooctype);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
index d3c2dfcbd7..0495dcfde5 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
@@ -30,7 +30,7 @@ public abstract class OOCInstruction extends Instruction {
protected static final Log LOG =
LogFactory.getLog(OOCInstruction.class.getName());
public enum OOCType {
- Reblock, AggregateUnary, Binary, Unary, MAPMM, AggregateBinary
+ Reblock, AggregateUnary, Binary, Unary, MAPMM, Reorg,
AggregateBinary
}
protected final OOCInstruction.OOCType _ooctype;
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
new file mode 100644
index 0000000000..fffd7ee7ed
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.functionobjects.SwapIndex;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+
+import java.util.concurrent.ExecutorService;
+
+public class TransposeOOCInstruction extends ComputationOOCInstruction {
+
+ protected TransposeOOCInstruction(OOCType type, ReorgOperator op,
CPOperand in1, CPOperand out, String opcode, String istr) {
+ super(type, op, in1, out, opcode, istr);
+
+ }
+
+ public static TransposeOOCInstruction parseInstruction(String str) {
+ String[] parts =
InstructionUtils.getInstructionPartsWithValueType(str);
+ InstructionUtils.checkNumFields(parts, 2);
+ String opcode = parts[0];
+ CPOperand in1 = new CPOperand(parts[1]);
+ CPOperand out = new CPOperand(parts[2]);
+
+ ReorgOperator reorg = new
ReorgOperator(SwapIndex.getSwapIndexFnObject());
+ return new TransposeOOCInstruction(OOCType.Reorg, reorg, in1,
out, opcode, str);
+ }
+
+ public void processInstruction( ExecutionContext ec ) {
+
+ // Create thread and process the transpose operation
+ MatrixObject min = ec.getMatrixObject(input1);
+ LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle();
+ LocalTaskQueue<IndexedMatrixValue> qOut = new
LocalTaskQueue<>();
+ ec.getMatrixObject(output).setStreamHandle(qOut);
+
+
+ ExecutorService pool = CommonThreadPool.get();
+ try {
+ pool.submit(() -> {
+ IndexedMatrixValue tmp = null;
+ try {
+ while ((tmp = qIn.dequeueTask()) !=
LocalTaskQueue.NO_MORE_TASKS) {
+ MatrixBlock inBlock =
(MatrixBlock)tmp.getValue();
+ long oldRowIdx =
tmp.getIndexes().getRowIndex();
+ long oldColIdx =
tmp.getIndexes().getColumnIndex();
+
+ MatrixBlock outBlock =
inBlock.reorgOperations((ReorgOperator) _optr, new MatrixBlock(), -1, -1, -1);
+ qOut.enqueueTask(new
IndexedMatrixValue(new MatrixIndexes(oldColIdx, oldRowIdx), outBlock));
+ }
+ qOut.closeInput();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ });
+ } catch (Exception ex) {
+ throw new DMLRuntimeException(ex);
+ } finally {
+ pool.shutdown();
+ }
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
new file mode 100644
index 0000000000..0dc04043d4
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TransposeTest extends AutomatedTestBase {
+ private final static String TEST_NAME1 = "Transpose";
+ private final static String TEST_DIR = "functions/ooc/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
TransposeTest.class.getSimpleName() + "/";
+ private final static double eps = 1e-10;
+ private static final String INPUT_NAME = "X";
+ private static final String OUTPUT_NAME = "res";
+
+ private final static int rows = 1000;
+ private final static int cols_wide = 1000;
+ private final static int cols_skinny = 500;
+
+ private final static double sparsity1 = 0.7;
+ private final static double sparsity2 = 0.1;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ TestConfiguration config = new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
+ addTestConfiguration(TEST_NAME1, config);
+ }
+
+ @Test
+ public void testTranspose1() {
+ runTransposeTest(cols_wide, false);
+ }
+
+// @Test
+// public void testTranspose2() {
+// runTransposeTest(cols_skinny, false);
+// }
+
+ private void runTransposeTest(int cols, boolean sparse )
+ {
+ Types.ExecMode platformOld =
setExecMode(Types.ExecMode.SINGLE_NODE);
+
+ try
+ {
+ getAndLoadTestConfiguration(TEST_NAME1);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+ programArgs = new String[]{"-explain", "-stats", "-ooc",
+ "-args",
input(INPUT_NAME), output(OUTPUT_NAME)};
+
+ // 1. Generate the data as MatrixBlock object
+ double[][] A_data = getRandomMatrix(rows, cols, 0, 1,
sparse?sparsity2:sparsity1, 10);
+
+ // 2. Convert the double arrays to MatrixBlock object
+ MatrixBlock A_mb =
DataConverter.convertToMatrixBlock(A_data);
+
+ // 3. Create a binary matrix writer
+ MatrixWriter writer =
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
+
+ // 4. Write matrix A to a binary SequenceFile
+ writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows,
cols, 1000, A_mb.getNonZeros());
+ HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"),
Types.ValueType.FP64,
+ new
MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()),
Types.FileFormat.BINARY);
+
+ boolean exceptionExpected = false;
+ runTest(true, exceptionExpected, null, -1);
+
+ double[][] C1 = readMatrix(output(OUTPUT_NAME),
Types.FileFormat.BINARY, rows, cols, 1000, 1000);
+ double result = 0.0;
+ for(int i = 0; i < rows; i++) { // verify the results
with Java
+ double expected = 0.0;
+ for(int j = 0; j < cols; j++) {
+ expected = A_mb.get(i, j);
+ result = C1[j][i];
+ Assert.assertEquals(expected, result,
eps);
+ }
+
+ }
+
+ String prefix = Instruction.OOC_INST_PREFIX;
+ Assert.assertTrue("OOC wasn't used for RBLK",
+
heavyHittersContainsString(prefix + Opcodes.RBLK));
+ Assert.assertTrue("OOC wasn't used for TRANSPOSE",
+
heavyHittersContainsString(prefix + Opcodes.TRANSPOSE));
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ resetExecMode(platformOld);
+ }
+ }
+
+ private static double[][] readMatrix(String fname, Types.FileFormat
fmt, long rows, long cols, int brows, int bcols )
+ throws IOException
+ {
+ MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt,
rows, cols, brows, bcols);
+ double[][] C = DataConverter.convertToDoubleMatrix(mb);
+ return C;
+ }
+}
diff --git a/src/test/scripts/functions/ooc/Transpose.dml
b/src/test/scripts/functions/ooc/Transpose.dml
new file mode 100644
index 0000000000..9b38939a2e
--- /dev/null
+++ b/src/test/scripts/functions/ooc/Transpose.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+X = read($1);
+
+res = t(X);
+
+write(res, $2, format="binary");