This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new a5b298cda0 [SYSTEMDS-3899] Unary out-of-core-operations
a5b298cda0 is described below

commit a5b298cda05e436e2339aa913be8d0217c0690d7
Author: Janardhan Pulivarthi <[email protected]>
AuthorDate: Sun Jul 27 12:32:06 2025 +0200

    [SYSTEMDS-3899] Unary out-of-core-operations
    
    Closes #2298.
---
 .../runtime/instructions/OOCInstructionParser.java |   3 +
 .../runtime/instructions/ooc/OOCInstruction.java   |   2 +-
 .../instructions/ooc/UnaryOOCInstruction.java      |  90 ++++++++++++++++
 .../apache/sysds/test/functions/ooc/UnaryTest.java | 114 +++++++++++++++++++++
 src/test/scripts/functions/ooc/Unary.dml           |  29 ++++++
 5 files changed, 237 insertions(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java 
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
index 0e5b3f1f51..a744b5d813 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
@@ -27,6 +27,7 @@ import 
org.apache.sysds.runtime.instructions.ooc.AggregateUnaryOOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.OOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction;
+import org.apache.sysds.runtime.instructions.ooc.UnaryOOCInstruction;
 
 public class OOCInstructionParser extends InstructionParser {
        protected static final Log LOG = 
LogFactory.getLog(OOCInstructionParser.class.getName());
@@ -51,6 +52,8 @@ public class OOCInstructionParser extends InstructionParser {
                                return 
ReblockOOCInstruction.parseInstruction(str);
                        case AggregateUnary:
                                return 
AggregateUnaryOOCInstruction.parseInstruction(str);
+                       case Unary:
+                               return 
UnaryOOCInstruction.parseInstruction(str);
                        case Binary:
                                return 
BinaryOOCInstruction.parseInstruction(str);
                        
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
index fe73e57fd2..db3d2da8b1 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
@@ -30,7 +30,7 @@ public abstract class OOCInstruction extends Instruction {
        protected static final Log LOG = 
LogFactory.getLog(OOCInstruction.class.getName());
 
        public enum OOCType {
-               Reblock, AggregateUnary, Binary
+               Reblock, AggregateUnary, Binary, Unary
        }
 
        protected final OOCInstruction.OOCType _ooctype;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
new file mode 100644
index 0000000000..d2fccd5fd6
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class UnaryOOCInstruction extends ComputationOOCInstruction {
+    private UnaryOperator _uop = null;
+
+    protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand 
in1, CPOperand out, String opcode, String istr) {
+        super(type, op, in1, out, opcode, istr);
+
+        _uop = op;
+    }
+
+    public static UnaryOOCInstruction parseInstruction(String str) {
+        String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+        InstructionUtils.checkNumFields(parts, 2);
+        String opcode = parts[0];
+        CPOperand in1 = new CPOperand(parts[1]);
+        CPOperand out = new CPOperand(parts[2]);
+
+        UnaryOperator uopcode = InstructionUtils.parseUnaryOperator(opcode);
+        return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, 
opcode, str);
+    }
+
+    public void processInstruction( ExecutionContext ec ) {
+        UnaryOperator uop = (UnaryOperator) _uop;
+        // Create thread and process the unary operation
+        MatrixObject min = ec.getMatrixObject(input1);
+        LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle();
+        LocalTaskQueue<IndexedMatrixValue> qOut = new LocalTaskQueue<>();
+        ec.getMatrixObject(output).setStreamHandle(qOut);
+
+
+        ExecutorService pool = CommonThreadPool.get();
+        try {
+            Future<?> task =pool.submit(() -> {
+                IndexedMatrixValue tmp = null;
+                try {
+                    while ((tmp = qIn.dequeueTask()) != 
LocalTaskQueue.NO_MORE_TASKS) {
+                        IndexedMatrixValue tmpOut = new IndexedMatrixValue();
+                        tmpOut.set(tmp.getIndexes(),
+                                tmp.getValue().unaryOperations(uop, new 
MatrixBlock()));
+                        qOut.enqueueTask(tmpOut);
+                    }
+                    qOut.closeInput();
+                }
+                catch(Exception ex) {
+                    throw new DMLRuntimeException(ex);
+                }
+            });
+            task.get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        } finally {
+            pool.shutdown();
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java 
b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
new file mode 100644
index 0000000000..a81689af37
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixValue;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class UnaryTest extends AutomatedTestBase {
+
+       private static final String TEST_NAME = "Unary";
+       private static final String TEST_DIR = "functions/ooc/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
UnaryTest.class.getSimpleName() + "/";
+       private static final String INPUT_NAME = "X";
+       private static final String OUTPUT_NAME = "res";
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               TestConfiguration config = new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME);
+               addTestConfiguration(TEST_NAME, config);
+       }
+
+       /**
+        * Test the sum of scalar multiplication, "sum(X*7)", with OOC backend.
+        */
+       @Test
+       public void testUnary() {
+               testUnaryOperation(false);
+       }
+       
+       
+       public void testUnaryOperation(boolean rewrite)
+       {
+               Types.ExecMode platformOld = setExecMode(ExecMode.SINGLE_NODE);
+               boolean oldRewrite = 
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+               OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = rewrite;
+               
+               try {
+                       getAndLoadTestConfiguration(TEST_NAME);
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[] {"-explain", "-stats", 
"-ooc", 
+                               "-args", input(INPUT_NAME), 
output(OUTPUT_NAME)};
+
+                       int rows = 1000, cols = 1000;
+                       MatrixBlock mb = MatrixBlock.randOperations(rows, cols, 
1.0, -1, 1, "uniform", 7);
+                       MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(FileFormat.BINARY);
+                       writer.writeMatrixToHDFS(mb, input(INPUT_NAME), rows, 
cols, 1000, rows*cols);
+                       HDFSTool.writeMetaDataFile(input(INPUT_NAME+".mtd"), 
ValueType.FP64, 
+                               new 
MatrixCharacteristics(rows,cols,1000,rows*cols), FileFormat.BINARY);
+                       
+                       runTest(true, false, null, -1);
+
+                       HashMap<MatrixValue.CellIndex, Double> dmlfile = 
readDMLMatrixFromOutputDir(OUTPUT_NAME);
+                       Double result = dmlfile.get(new 
MatrixValue.CellIndex(1, 1));
+                       double expected = 0.0;
+                       for(int i = 0; i < rows; i++) {
+                               for(int j = 0; j < cols; j++) {
+                                       expected += Math.ceil(mb.get(i, j));
+                               }
+                       }
+
+                       Assert.assertEquals(expected, result, 1e-10);
+
+                       String prefix = Instruction.OOC_INST_PREFIX;
+                       Assert.assertTrue("OOC wasn't used for RBLK",
+                               heavyHittersContainsString(prefix + 
Opcodes.RBLK));
+                       Assert.assertTrue("OOC wasn't used for CEIL",
+                               heavyHittersContainsString(prefix + 
Opcodes.CEIL));
+               }
+               catch(Exception ex) {
+                       Assert.fail(ex.getMessage());
+               }
+               finally {
+                       OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = 
oldRewrite;
+                       resetExecMode(platformOld);
+               }
+       }
+}
diff --git a/src/test/scripts/functions/ooc/Unary.dml 
b/src/test/scripts/functions/ooc/Unary.dml
new file mode 100644
index 0000000000..6d34e8fd76
--- /dev/null
+++ b/src/test/scripts/functions/ooc/Unary.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read input matrix and operator from command line args
+X = read($1);
+#print(toString(X))
+Y = ceil(X);
+#print(toString(Y))
+res = as.matrix(sum(Y));
+# Write the final matrix result
+write(res, $2);

Reply via email to