This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e23ad8bd8 [SYSTEMDS-3899] Fix incorrect barrier in unary OOC 
operations
9e23ad8bd8 is described below

commit 9e23ad8bd80c2b4e8e94c941887e0c8155e8b781
Author: Janardhan Pulivarthi <[email protected]>
AuthorDate: Mon Aug 11 09:38:52 2025 +0200

    [SYSTEMDS-3899] Fix incorrect barrier in unary OOC operations
    
    Closes #2306.
---
 .../instructions/ooc/UnaryOOCInstruction.java      | 91 +++++++++++-----------
 1 file changed, 44 insertions(+), 47 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
index d2fccd5fd6..13cd5463ed 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
@@ -30,61 +30,58 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 public class UnaryOOCInstruction extends ComputationOOCInstruction {
-    private UnaryOperator _uop = null;
+       private UnaryOperator _uop = null;
 
-    protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand 
in1, CPOperand out, String opcode, String istr) {
-        super(type, op, in1, out, opcode, istr);
+       protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand 
in1, CPOperand out, String opcode, String istr) {
+               super(type, op, in1, out, opcode, istr);
 
-        _uop = op;
-    }
+               _uop = op;
+       }
 
-    public static UnaryOOCInstruction parseInstruction(String str) {
-        String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-        InstructionUtils.checkNumFields(parts, 2);
-        String opcode = parts[0];
-        CPOperand in1 = new CPOperand(parts[1]);
-        CPOperand out = new CPOperand(parts[2]);
+       public static UnaryOOCInstruction parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields(parts, 2);
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand out = new CPOperand(parts[2]);
 
-        UnaryOperator uopcode = InstructionUtils.parseUnaryOperator(opcode);
-        return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, 
opcode, str);
-    }
+               UnaryOperator uopcode = 
InstructionUtils.parseUnaryOperator(opcode);
+               return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, 
out, opcode, str);
+       }
 
-    public void processInstruction( ExecutionContext ec ) {
-        UnaryOperator uop = (UnaryOperator) _uop;
-        // Create thread and process the unary operation
-        MatrixObject min = ec.getMatrixObject(input1);
-        LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle();
-        LocalTaskQueue<IndexedMatrixValue> qOut = new LocalTaskQueue<>();
-        ec.getMatrixObject(output).setStreamHandle(qOut);
+       public void processInstruction( ExecutionContext ec ) {
+               UnaryOperator uop = (UnaryOperator) _uop;
+               // Create thread and process the unary operation
+               MatrixObject min = ec.getMatrixObject(input1);
+               LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle();
+               LocalTaskQueue<IndexedMatrixValue> qOut = new 
LocalTaskQueue<>();
+               ec.getMatrixObject(output).setStreamHandle(qOut);
 
 
-        ExecutorService pool = CommonThreadPool.get();
-        try {
-            Future<?> task =pool.submit(() -> {
-                IndexedMatrixValue tmp = null;
-                try {
-                    while ((tmp = qIn.dequeueTask()) != 
LocalTaskQueue.NO_MORE_TASKS) {
-                        IndexedMatrixValue tmpOut = new IndexedMatrixValue();
-                        tmpOut.set(tmp.getIndexes(),
-                                tmp.getValue().unaryOperations(uop, new 
MatrixBlock()));
-                        qOut.enqueueTask(tmpOut);
-                    }
-                    qOut.closeInput();
-                }
-                catch(Exception ex) {
-                    throw new DMLRuntimeException(ex);
-                }
-            });
-            task.get();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new RuntimeException(e);
-        } finally {
-            pool.shutdown();
-        }
-    }
+               ExecutorService pool = CommonThreadPool.get();
+               try {
+                       pool.submit(() -> {
+                               IndexedMatrixValue tmp = null;
+                               try {
+                                       while ((tmp = qIn.dequeueTask()) != 
LocalTaskQueue.NO_MORE_TASKS) {
+                                               IndexedMatrixValue tmpOut = new 
IndexedMatrixValue();
+                                               tmpOut.set(tmp.getIndexes(),
+                                                               
tmp.getValue().unaryOperations(uop, new MatrixBlock()));
+                                               qOut.enqueueTask(tmpOut);
+                                       }
+                                       qOut.closeInput();
+                               }
+                               catch(Exception ex) {
+                                       throw new DMLRuntimeException(ex);
+                               }
+                       });
+               } catch (Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               } finally {
+                       pool.shutdown();
+               }
+       }
 }

Reply via email to