This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 9e23ad8bd8 [SYSTEMDS-3899] Fix incorrect barrier in unary OOC
operations
9e23ad8bd8 is described below
commit 9e23ad8bd80c2b4e8e94c941887e0c8155e8b781
Author: Janardhan Pulivarthi <[email protected]>
AuthorDate: Mon Aug 11 09:38:52 2025 +0200
[SYSTEMDS-3899] Fix incorrect barrier in unary OOC operations
Closes #2306.
---
.../instructions/ooc/UnaryOOCInstruction.java | 91 +++++++++++-----------
1 file changed, 44 insertions(+), 47 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
index d2fccd5fd6..13cd5463ed 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
@@ -30,61 +30,58 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
import org.apache.sysds.runtime.util.CommonThreadPool;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
public class UnaryOOCInstruction extends ComputationOOCInstruction {
- private UnaryOperator _uop = null;
+ private UnaryOperator _uop = null;
- protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand
in1, CPOperand out, String opcode, String istr) {
- super(type, op, in1, out, opcode, istr);
+ protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand
in1, CPOperand out, String opcode, String istr) {
+ super(type, op, in1, out, opcode, istr);
- _uop = op;
- }
+ _uop = op;
+ }
- public static UnaryOOCInstruction parseInstruction(String str) {
- String[] parts =
InstructionUtils.getInstructionPartsWithValueType(str);
- InstructionUtils.checkNumFields(parts, 2);
- String opcode = parts[0];
- CPOperand in1 = new CPOperand(parts[1]);
- CPOperand out = new CPOperand(parts[2]);
+ public static UnaryOOCInstruction parseInstruction(String str) {
+ String[] parts =
InstructionUtils.getInstructionPartsWithValueType(str);
+ InstructionUtils.checkNumFields(parts, 2);
+ String opcode = parts[0];
+ CPOperand in1 = new CPOperand(parts[1]);
+ CPOperand out = new CPOperand(parts[2]);
- UnaryOperator uopcode = InstructionUtils.parseUnaryOperator(opcode);
- return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out,
opcode, str);
- }
+ UnaryOperator uopcode =
InstructionUtils.parseUnaryOperator(opcode);
+ return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1,
out, opcode, str);
+ }
- public void processInstruction( ExecutionContext ec ) {
- UnaryOperator uop = (UnaryOperator) _uop;
- // Create thread and process the unary operation
- MatrixObject min = ec.getMatrixObject(input1);
- LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle();
- LocalTaskQueue<IndexedMatrixValue> qOut = new LocalTaskQueue<>();
- ec.getMatrixObject(output).setStreamHandle(qOut);
+ public void processInstruction( ExecutionContext ec ) {
+ UnaryOperator uop = (UnaryOperator) _uop;
+ // Create thread and process the unary operation
+ MatrixObject min = ec.getMatrixObject(input1);
+ LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle();
+ LocalTaskQueue<IndexedMatrixValue> qOut = new
LocalTaskQueue<>();
+ ec.getMatrixObject(output).setStreamHandle(qOut);
- ExecutorService pool = CommonThreadPool.get();
- try {
- Future<?> task =pool.submit(() -> {
- IndexedMatrixValue tmp = null;
- try {
- while ((tmp = qIn.dequeueTask()) !=
LocalTaskQueue.NO_MORE_TASKS) {
- IndexedMatrixValue tmpOut = new IndexedMatrixValue();
- tmpOut.set(tmp.getIndexes(),
- tmp.getValue().unaryOperations(uop, new
MatrixBlock()));
- qOut.enqueueTask(tmpOut);
- }
- qOut.closeInput();
- }
- catch(Exception ex) {
- throw new DMLRuntimeException(ex);
- }
- });
- task.get();
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- pool.shutdown();
- }
- }
+ ExecutorService pool = CommonThreadPool.get();
+ try {
+ pool.submit(() -> {
+ IndexedMatrixValue tmp = null;
+ try {
+ while ((tmp = qIn.dequeueTask()) !=
LocalTaskQueue.NO_MORE_TASKS) {
+ IndexedMatrixValue tmpOut = new
IndexedMatrixValue();
+ tmpOut.set(tmp.getIndexes(),
+
tmp.getValue().unaryOperations(uop, new MatrixBlock()));
+ qOut.enqueueTask(tmpOut);
+ }
+ qOut.closeInput();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ });
+ } catch (Exception ex) {
+ throw new DMLRuntimeException(ex);
+ } finally {
+ pool.shutdown();
+ }
+ }
}