This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new b0ef87544e [SYSTEMDS-3931] Out-of-core right indexing operations
b0ef87544e is described below
commit b0ef87544ec38384ab2d41aa1d2a387fbf7b7a36
Author: Jannik Lindemann <[email protected]>
AuthorDate: Sun Nov 16 11:21:46 2025 +0100
[SYSTEMDS-3931] Out-of-core right indexing operations
Closes #2351.
---
.../runtime/instructions/OOCInstructionParser.java | 7 +-
.../runtime/instructions/ooc/CachingStream.java | 45 +--
.../instructions/ooc/IndexingOOCInstruction.java | 338 +++++++++++++++++++++
.../ooc/MatrixIndexingOOCInstruction.java | 254 ++++++++++++++++
.../instructions/ooc/SubscribableTaskQueue.java | 2 +-
.../instructions/ooc/UnaryOOCInstruction.java | 6 +
.../org/apache/sysds/runtime/util/IndexRange.java | 12 +
.../functions/ooc/RightIndexingCoordsTest.java | 133 ++++++++
.../test/functions/ooc/RightIndexingTest.java | 215 +++++++++++++
src/test/scripts/functions/ooc/RightIndexing.dml | 31 ++
.../scripts/functions/ooc/RightIndexingCoords.dml | 29 ++
11 files changed, 1047 insertions(+), 25 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
index a14079160f..938615eb9c 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
@@ -28,6 +28,7 @@ import
org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.CSVReblockOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.CentralMomentOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction;
+import org.apache.sysds.runtime.instructions.ooc.IndexingOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.OOCInstruction;
import
org.apache.sysds.runtime.instructions.ooc.ParameterizedBuiltinOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction;
@@ -75,12 +76,14 @@ public class OOCInstructionParser extends InstructionParser
{
return
TransposeOOCInstruction.parseInstruction(str);
case Tee:
return TeeOOCInstruction.parseInstruction(str);
- case CentralMoment:
- return CentralMomentOOCInstruction.parseInstruction(str);
+ case CentralMoment:
+ return
CentralMomentOOCInstruction.parseInstruction(str);
case Ctable:
return
CtableOOCInstruction.parseInstruction(str);
case ParameterizedBuiltin:
return
ParameterizedBuiltinOOCInstruction.parseInstruction(str);
+ case MatrixIndexing:
+ return
IndexingOOCInstruction.parseInstruction(str);
default:
throw new DMLRuntimeException("Invalid OOC
Instruction Type: " + ooctype);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java
index 1a54030280..b74f7ed5e1 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java
@@ -80,36 +80,37 @@ public class CachingStream implements
OOCStreamable<IndexedMatrixValue> {
});
}
- private boolean fetchFromStream() throws InterruptedException {
- synchronized (this) {
- if(!_cacheInProgress)
- throw new DMLRuntimeException("Stream is
closed");
- }
+ private synchronized boolean fetchFromStream() throws
InterruptedException {
+ if(!_cacheInProgress)
+ throw new DMLRuntimeException("Stream is closed");
IndexedMatrixValue task = _source.dequeue();
- synchronized (this) {
- if(task != LocalTaskQueue.NO_MORE_TASKS) {
- OOCEvictionManager.put(_streamId, _numBlocks,
task);
- if (_index != null)
- _index.put(task.getIndexes(),
_numBlocks);
- _numBlocks++;
- notifyAll();
- return false;
- }
- else {
- _cacheInProgress = false; // caching is complete
- notifyAll();
- return true;
- }
+ if(task != LocalTaskQueue.NO_MORE_TASKS) {
+ OOCEvictionManager.put(_streamId, _numBlocks, task);
+ if (_index != null)
+ _index.put(task.getIndexes(), _numBlocks);
+ _numBlocks++;
+ notifyAll();
+ return false;
+ }
+ else {
+ _cacheInProgress = false; // caching is complete
+ notifyAll();
+ return true;
}
}
public synchronized IndexedMatrixValue get(int idx) throws
InterruptedException {
while (true) {
- if (idx < _numBlocks)
- return OOCEvictionManager.get(_streamId, idx);
- else if (!_cacheInProgress)
+ if (idx < _numBlocks) {
+ IndexedMatrixValue out =
OOCEvictionManager.get(_streamId, idx);
+
+ if (_index != null) // Ensure index is up to
date
+ _index.putIfAbsent(out.getIndexes(),
idx);
+
+ return out;
+ } else if (!_cacheInProgress)
return
(IndexedMatrixValue)LocalTaskQueue.NO_MORE_TASKS;
wait();
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java
new file mode 100644
index 0000000000..1d555da8d6
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/IndexingOOCInstruction.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.IndexingCPInstruction;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.util.IndexRange;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+public abstract class IndexingOOCInstruction extends UnaryOOCInstruction {
+ protected final CPOperand rowLower, rowUpper, colLower, colUpper;
+
+ public static IndexingOOCInstruction parseInstruction(String str) {
+ IndexingCPInstruction cpInst =
IndexingCPInstruction.parseInstruction(str);
+ return parseInstruction(cpInst);
+ }
+
+ public static IndexingOOCInstruction
parseInstruction(IndexingCPInstruction cpInst) {
+ String opcode = cpInst.getOpcode();
+
+ if(opcode.equalsIgnoreCase(Opcodes.RIGHT_INDEX.toString())) {
+ if(cpInst.input1.getDataType().isMatrix()) {
+ return new
MatrixIndexingOOCInstruction(cpInst.input1, cpInst.getRowLower(),
cpInst.getRowUpper(),
+ cpInst.getColLower(),
cpInst.getColUpper(), cpInst.output, cpInst.getOpcode(),
+ cpInst.getInstructionString());
+ }
+ else {
+ throw new NotImplementedException();
+ }
+ }
+
+ throw new NotImplementedException();
+ }
+
+ protected IndexingOOCInstruction(CPOperand in, CPOperand rl, CPOperand
ru, CPOperand cl, CPOperand cu,
+ CPOperand out, String opcode, String istr) {
+ super(OOCInstruction.OOCType.MatrixIndexing, null, in, out,
opcode, istr);
+ rowLower = rl;
+ rowUpper = ru;
+ colLower = cl;
+ colUpper = cu;
+ }
+
+ protected IndexingOOCInstruction(CPOperand lhsInput, CPOperand
rhsInput, CPOperand rl, CPOperand ru, CPOperand cl,
+ CPOperand cu, CPOperand out, String opcode, String istr) {
+ super(OOCInstruction.OOCType.MatrixIndexing, null, lhsInput,
rhsInput, out, opcode, istr);
+ rowLower = rl;
+ rowUpper = ru;
+ colLower = cl;
+ colUpper = cu;
+ }
+
+ protected IndexRange getIndexRange(ExecutionContext ec) {
+ return new IndexRange( //rl, ru, cl, ru
+ (int) (ec.getScalarInput(rowLower).getLongValue() - 1),
+ (int) (ec.getScalarInput(rowUpper).getLongValue() - 1),
+ (int) (ec.getScalarInput(colLower).getLongValue() - 1),
+ (int) (ec.getScalarInput(colUpper).getLongValue() - 1));
+ }
+
+ public static class BlockAligner<T> {
+ private final int _blocksize;
+ private final IndexRange _indexRange;
+ private final IndexRange _blockRange;
+ private final int _outRows;
+ private final int _outCols;
+ private final Sector<T>[] _blocks;
+ private final AtomicInteger _emitCtr;
+
+ @SuppressWarnings("unchecked")
+ public BlockAligner(IndexRange range, int blocksize) {
+ _indexRange = range;
+ _blocksize = blocksize;
+
+ long firstBlockRow = range.rowStart / blocksize;
+ long lastBlockRow = range.rowEnd / blocksize;
+ long firstBlockCol = range.colStart / blocksize;
+ long lastBlockCol = range.colEnd / blocksize;
+ _blockRange = new IndexRange(firstBlockRow,
lastBlockRow + 1, firstBlockCol, lastBlockCol + 1);
+
+ long totalRows = range.rowSpan() + 1;
+ long totalCols = range.colSpan() + 1;
+ _outRows = (int) ((totalRows + blocksize - 1) /
blocksize);
+ _outCols = (int) ((totalCols + blocksize - 1) /
blocksize);
+
+ _blocks = (Sector<T>[]) new Sector[_outRows * _outCols];
+ _emitCtr = new AtomicInteger(0);
+ }
+
+ public boolean isAligned() {
+ return (_indexRange.rowStart % _blocksize) == 0 &&
(_indexRange.colStart % _blocksize) == 0;
+ }
+
+ public boolean putNext(MatrixIndexes index, T data,
BiConsumer<MatrixIndexes, Sector<T>> emitter) {
+ long blockRow = index.getRowIndex() - 1;
+ long blockCol = index.getColumnIndex() - 1;
+
+ if(!_blockRange.isWithin(blockRow, blockCol))
+ return false;
+
+ long blockRowStart = blockRow * _blocksize;
+ long blockRowEnd = blockRowStart + _blocksize - 1;
+ long blockColStart = blockCol * _blocksize;
+ long blockColEnd = blockColStart + _blocksize - 1;
+
+ long overlapRowStart = Math.max(_indexRange.rowStart,
blockRowStart);
+ long overlapRowEnd = Math.min(_indexRange.rowEnd,
blockRowEnd);
+ long overlapColStart = Math.max(_indexRange.colStart,
blockColStart);
+ long overlapColEnd = Math.min(_indexRange.colEnd,
blockColEnd);
+
+ int outRowStart = (int) ((overlapRowStart -
_indexRange.rowStart) / _blocksize);
+ int outRowEnd = (int) ((overlapRowEnd -
_indexRange.rowStart) / _blocksize);
+ int outColStart = (int) ((overlapColStart -
_indexRange.colStart) / _blocksize);
+ int outColEnd = (int) ((overlapColEnd -
_indexRange.colStart) / _blocksize);
+
+ int emitCtr = -1;
+
+ for(int outRow = outRowStart; outRow <= outRowEnd;
outRow++) {
+ long targetRowStartGlobal =
_indexRange.rowStart + (long) outRow * _blocksize;
+ long targetRowEndGlobal =
Math.min(_indexRange.rowEnd, targetRowStartGlobal + _blocksize - 1);
+ long targetStartBlockRow = targetRowStartGlobal
/ _blocksize;
+ long targetEndBlockRow = targetRowEndGlobal /
_blocksize;
+ int rowSegments = (int) (targetEndBlockRow -
targetStartBlockRow + 1);
+
+ for(int outCol = outColStart; outCol <=
outColEnd; outCol++) {
+ long targetColStartGlobal =
_indexRange.colStart + (long) outCol * _blocksize;
+ long targetColEndGlobal =
Math.min(_indexRange.colEnd, targetColStartGlobal + _blocksize - 1);
+ long targetStartBlockCol =
targetColStartGlobal / _blocksize;
+ long targetEndBlockCol =
targetColEndGlobal / _blocksize;
+ int colSegments = (int)
(targetEndBlockCol - targetStartBlockCol + 1);
+
+ int rowOffset = (rowSegments == 1) ? 0
: (blockRow == targetStartBlockRow ? 0 : 1);
+ int colOffset = (colSegments == 1) ? 0
: (blockCol == targetStartBlockCol ? 0 : 1);
+
+ Sector<T> sector = getOrCreate(outRow,
outCol, rowSegments, colSegments);
+ if(sector == null)
+ continue;
+
+ boolean emit = sector.place(rowOffset,
colOffset, data);
+ if(emit) {
+ int idxPos =
resolveIndex(outRow, outCol);
+ _blocks[idxPos] = null;
+ emitCtr =
_emitCtr.incrementAndGet();
+ emitter.accept(new
MatrixIndexes(outRow + 1, outCol + 1), sector);
+ }
+ }
+ }
+
+ return emitCtr >= _blocks.length;
+ }
+
+ private int resolveIndex(int row, int col) {
+ if(row < 0 || row >= _outRows || col < 0 || col >=
_outCols)
+ return -1;
+ return row * _outCols + col;
+ }
+
+ private synchronized Sector<T> getOrCreate(int outRow, int
outCol, int rowSegments, int colSegments) {
+ int idx = resolveIndex(outRow, outCol);
+ if(idx == -1)
+ return null;
+
+ Sector<T> s = _blocks[idx];
+ if(s == null) {
+ if(rowSegments == 1 && colSegments == 1)
+ s = new Sector1<>();
+ else if(rowSegments == 1 && colSegments == 2)
+ s = new Sector2Col<>();
+ else if(rowSegments == 2 && colSegments == 1)
+ s = new Sector2Row<>();
+ else
+ s = new Sector4<>();
+ _blocks[idx] = s;
+ }
+
+ return s;
+ }
+
+ public synchronized void close() {
+ if(_emitCtr.get() != _blocks.length)
+ throw new DMLRuntimeException("BlockAligner
still has some unfinished sectors");
+ }
+ }
+
+ public static abstract class Sector<T> {
+ public abstract boolean place(int rowOffset, int colOffset, T
data);
+
+ public abstract T get(int rowOffset, int colOffset);
+
+ public abstract int count();
+ }
+
+ public static class Sector1<T> extends Sector<T> {
+ private T _data;
+
+ @Override
+ public synchronized boolean place(int rowOffset, int colOffset,
T data) {
+ if(rowOffset != 0 || colOffset != 0)
+ return false;
+
+ _data = data;
+ return true;
+ }
+
+ @Override
+ public synchronized T get(int rowOffset, int colOffset) {
+ return (rowOffset == 0 && colOffset == 0) ? _data :
null;
+ }
+
+ @Override
+ public synchronized int count() {
+ return _data == null ? 0 : 1;
+ }
+ }
+
+ public static class Sector4<T> extends Sector<T> {
+ private int _count;
+ private final T[] _data;
+
+ @SuppressWarnings("unchecked")
+ public Sector4() {
+ _count = 0;
+ _data = (T[]) new Object[4];
+ }
+
+ @Override
+ public synchronized boolean place(int rowOffset, int colOffset,
T data) {
+ int pos = rowOffset * 2 + colOffset;
+ if(_data[pos] == null) {
+ _data[pos] = data;
+ _count++;
+ }
+ return _count == 4;
+ }
+
+ @Override
+ public synchronized T get(int rowOffset, int colOffset) {
+ return _data[rowOffset * 2 + colOffset];
+ }
+
+ @Override
+ public synchronized int count() {
+ return _count;
+ }
+ }
+
+ public static class Sector2Col<T> extends Sector<T> {
+ private int _count;
+ private final T[] _data;
+
+ @SuppressWarnings("unchecked")
+ public Sector2Col() {
+ _count = 0;
+ _data = (T[]) new Object[2];
+ }
+
+ @Override
+ public synchronized boolean place(int rowOffset, int colOffset,
T data) {
+ if(rowOffset != 0 || colOffset < 0 || colOffset > 1)
+ return false;
+
+ if(_data[colOffset] == null) {
+ _data[colOffset] = data;
+ _count++;
+ }
+
+ return _count == 2;
+ }
+
+ @Override
+ public synchronized T get(int rowOffset, int colOffset) {
+ return (rowOffset == 0 && colOffset >= 0 && colOffset <
2) ? _data[colOffset] : null;
+ }
+
+ @Override
+ public synchronized int count() {
+ return _count;
+ }
+ }
+
+ public static class Sector2Row<T> extends Sector<T> {
+ private int _count;
+ private final T[] _data;
+
+ @SuppressWarnings("unchecked")
+ public Sector2Row() {
+ _count = 0;
+ _data = (T[]) new Object[2];
+ }
+
+ @Override
+ public synchronized boolean place(int rowOffset, int colOffset,
T data) {
+ if(colOffset != 0 || rowOffset < 0 || rowOffset > 1)
+ return false;
+
+ if(_data[rowOffset] == null) {
+ _data[rowOffset] = data;
+ _count++;
+ }
+
+ return _count == 2;
+ }
+
+ @Override
+ public synchronized T get(int rowOffset, int colOffset) {
+ return (colOffset == 0 && rowOffset >= 0 && rowOffset <
2) ? _data[rowOffset] : null;
+ }
+
+ @Override
+ public synchronized int count() {
+ return _count;
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java
new file mode 100644
index 0000000000..a04a77677c
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixIndexingOOCInstruction.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.util.IndexRange;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MatrixIndexingOOCInstruction extends IndexingOOCInstruction {
+
+ public MatrixIndexingOOCInstruction(CPOperand in, CPOperand rl,
CPOperand ru, CPOperand cl, CPOperand cu,
+ CPOperand out, String opcode, String istr) {
+ super(in, rl, ru, cl, cu, out, opcode, istr);
+ }
+
+ protected MatrixIndexingOOCInstruction(CPOperand lhsInput, CPOperand
rhsInput, CPOperand rl, CPOperand ru,
+ CPOperand cl, CPOperand cu, CPOperand out, String opcode,
String istr) {
+ super(lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr);
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec) {
+ String opcode = getOpcode();
+ IndexRange ix = getIndexRange(ec);
+
+ MatrixObject mo = ec.getMatrixObject(input1.getName());
+ int blocksize = mo.getBlocksize();
+ long firstBlockRow = ix.rowStart / blocksize;
+ long lastBlockRow = ix.rowEnd / blocksize;
+ long firstBlockCol = ix.colStart / blocksize;
+ long lastBlockCol = ix.colEnd / blocksize;
+
+ boolean inRange = ix.rowStart < mo.getNumRows() && ix.colStart
< mo.getNumColumns();
+
+ OOCStream<IndexedMatrixValue> qIn = mo.getStreamHandle();
+ OOCStream<IndexedMatrixValue> qOut = createWritableStream();
+
+ addInStream(qIn);
+ addOutStream(qOut);
+
+ MatrixObject mOut = ec.getMatrixObject(output);
+ mOut.setStreamHandle(qOut);
+
+ //right indexing
+ if(opcode.equalsIgnoreCase(Opcodes.RIGHT_INDEX.toString())) {
+ if(output.isScalar() && inRange) {
+ IndexedMatrixValue tmp;
+
+ while((tmp = qIn.dequeue()) !=
LocalTaskQueue.NO_MORE_TASKS) {
+ if(tmp.getIndexes().getRowIndex() ==
firstBlockRow &&
+
tmp.getIndexes().getColumnIndex() == firstBlockCol) {
+
ec.setScalarOutput(output.getName(), new DoubleObject(
+
tmp.getValue().get((int) ix.rowStart % blocksize, (int) ix.rowEnd %
blocksize)));
+ return;
+ }
+ }
+
+ throw new DMLRuntimeException("Desired block
not found");
+ }
+
+ final AtomicReference<CompletableFuture<Void>>
futureRef = new AtomicReference<>();
+
+ if(ix.rowStart % blocksize == 0 && ix.colStart %
blocksize == 0) {
+ // Aligned case: interior blocks can be
forwarded directly, borders may require slicing
+ final int outBlockRows = (int)
Math.ceil((double) (ix.rowSpan() + 1) / blocksize);
+ final int outBlockCols = (int)
Math.ceil((double) (ix.colSpan() + 1) / blocksize);
+ final int totalBlocks = outBlockRows *
outBlockCols;
+ final AtomicInteger producedBlocks = new
AtomicInteger(0);
+
+ CompletableFuture<Void> future = filterOOC(qIn,
tmp -> {
+ MatrixIndexes inIdx = tmp.getIndexes();
+ long blockRow = inIdx.getRowIndex() - 1;
+ long blockCol = inIdx.getColumnIndex()
- 1;
+
+ MatrixBlock block = (MatrixBlock)
tmp.getValue();
+
+ int rowStartLocal = (blockRow ==
firstBlockRow) ? (int) (ix.rowStart % blocksize) : 0;
+ int rowEndLocal = (blockRow ==
lastBlockRow) ? Math.min(block.getNumRows() - 1,
+ (int) (ix.rowEnd % blocksize))
: block.getNumRows() - 1;
+ int colStartLocal = (blockCol ==
firstBlockCol) ? (int) (ix.colStart % blocksize) : 0;
+ int colEndLocal = (blockCol ==
lastBlockCol) ? Math.min(block.getNumColumns() - 1,
+ (int) (ix.colEnd % blocksize))
: block.getNumColumns() - 1;
+
+ MatrixBlock outBlock;
+ if(rowStartLocal == 0 && rowEndLocal ==
block.getNumRows() - 1 && colStartLocal == 0 &&
+ colEndLocal ==
block.getNumColumns() - 1) {
+ outBlock = block;
+ }
+ else {
+ outBlock =
block.slice(rowStartLocal, rowEndLocal, colStartLocal, colEndLocal);
+ }
+
+ long outBlockRow = blockRow -
firstBlockRow + 1;
+ long outBlockCol = blockCol -
firstBlockCol + 1;
+ qOut.enqueue(new IndexedMatrixValue(new
MatrixIndexes(outBlockRow, outBlockCol), outBlock));
+
+ if(producedBlocks.incrementAndGet() >=
totalBlocks) {
+ CompletableFuture<Void> f =
futureRef.get();
+ if(f != null)
+ f.cancel(true);
+ }
+ }, tmp -> {
+ long blockRow =
tmp.getIndexes().getRowIndex() - 1;
+ long blockCol =
tmp.getIndexes().getColumnIndex() - 1;
+ return blockRow >= firstBlockRow &&
blockRow <= lastBlockRow && blockCol >= firstBlockCol &&
+ blockCol <= lastBlockCol;
+ }, qOut::closeInput);
+ futureRef.set(future);
+ return;
+ }
+
+ final BlockAligner<IndexedBlockMeta> aligner = new
BlockAligner<>(ix, blocksize);
+
+ // We may need to construct our own intermediate stream
to properly manage the cached items
+ boolean hasIntermediateStream = !qIn.hasStreamCache();
+ final CachingStream cachedStream =
hasIntermediateStream ? new CachingStream(new SubscribableTaskQueue<>()) :
qOut.getStreamCache();
+ cachedStream.activateIndexing();
+
+ CompletableFuture<Void> future =
filterOOC(qIn.getReadStream(), tmp -> {
+ if (hasIntermediateStream) {
+ // We write to an intermediate stream
to ensure that these matrix blocks are properly cached
+
cachedStream.getWriteStream().enqueue(tmp);
+ }
+
+ boolean completed =
aligner.putNext(tmp.getIndexes(), new IndexedBlockMeta(tmp), (idx, sector) -> {
+ int targetBlockRow = (int)
(idx.getRowIndex() - 1);
+ int targetBlockCol = (int)
(idx.getColumnIndex() - 1);
+
+ long targetRowStartGlobal = ix.rowStart
+ (long) targetBlockRow * blocksize;
+ long targetRowEndGlobal =
Math.min(ix.rowEnd, targetRowStartGlobal + blocksize - 1);
+ long targetColStartGlobal = ix.colStart
+ (long) targetBlockCol * blocksize;
+ long targetColEndGlobal =
Math.min(ix.colEnd, targetColStartGlobal + blocksize - 1);
+
+ int nRows = (int) (targetRowEndGlobal -
targetRowStartGlobal + 1);
+ int nCols = (int) (targetColEndGlobal -
targetColStartGlobal + 1);
+
+ long firstSrcBlockRow =
targetRowStartGlobal / blocksize;
+ long lastSrcBlockRow =
targetRowEndGlobal / blocksize;
+ int rowSegments = (int)
(lastSrcBlockRow - firstSrcBlockRow + 1);
+
+ long firstSrcBlockCol =
targetColStartGlobal / blocksize;
+ long lastSrcBlockCol =
targetColEndGlobal / blocksize;
+ int colSegments = (int)
(lastSrcBlockCol - firstSrcBlockCol + 1);
+
+ MatrixBlock target = null;
+
+ for(int r = 0; r < rowSegments; r++) {
+ for(int c = 0; c < colSegments;
c++) {
+ IndexedBlockMeta ibm =
sector.get(r, c);
+ if(ibm == null)
+ continue;
+
+ IndexedMatrixValue mv =
cachedStream.findCached(ibm.idx);
+ MatrixBlock srcBlock =
(MatrixBlock) mv.getValue();
+
+ if(target == null)
+ target = new
MatrixBlock(nRows, nCols, srcBlock.isInSparseFormat());
+
+ long srcBlockRowStart =
(ibm.idx.getRowIndex() - 1) * blocksize;
+ long srcBlockColStart =
(ibm.idx.getColumnIndex() - 1) * blocksize;
+ long
sliceRowStartGlobal = Math.max(targetRowStartGlobal, srcBlockRowStart);
+ long sliceRowEndGlobal
= Math.min(targetRowEndGlobal,
+
srcBlockRowStart + srcBlock.getNumRows() - 1);
+ long
sliceColStartGlobal = Math.max(targetColStartGlobal, srcBlockColStart);
+ long sliceColEndGlobal
= Math.min(targetColEndGlobal,
+
srcBlockColStart + srcBlock.getNumColumns() - 1);
+
+ int sliceRowStart =
(int) (sliceRowStartGlobal - srcBlockRowStart);
+ int sliceRowEnd = (int)
(sliceRowEndGlobal - srcBlockRowStart);
+ int sliceColStart =
(int) (sliceColStartGlobal - srcBlockColStart);
+ int sliceColEnd = (int)
(sliceColEndGlobal - srcBlockColStart);
+
+ int targetRowOffset =
(int) (sliceRowStartGlobal - targetRowStartGlobal);
+ int targetColOffset =
(int) (sliceColStartGlobal - targetColStartGlobal);
+
+ MatrixBlock sliced =
srcBlock.slice(sliceRowStart, sliceRowEnd, sliceColStart, sliceColEnd);
+ sliced.putInto(target,
targetRowOffset, targetColOffset, true);
+ }
+ }
+
+ qOut.enqueue(new
IndexedMatrixValue(idx, target));
+ });
+
+ if(completed) {
+ // All blocks have been processed; we
can cancel the future
+ // Currently, this does not affect
processing (predicates prevent task submission anyway).
+ // However, a cancelled future may
allow early file read aborts once implemented.
+ CompletableFuture<Void> f =
futureRef.get();
+ if(f != null)
+ f.cancel(true);
+ }
+ }, tmp -> {
+ // Pre-filter incoming blocks to avoid
unnecessary task submission
+ long blockRow = tmp.getIndexes().getRowIndex()
- 1;
+ long blockCol =
tmp.getIndexes().getColumnIndex() - 1;
+ return blockRow >= firstBlockRow && blockRow <=
lastBlockRow && blockCol >= firstBlockCol &&
+ blockCol <= lastBlockCol;
+ }, () -> {
+ aligner.close();
+ qOut.closeInput();
+ });
+ futureRef.set(future);
+ }
+ //left indexing
+ else if(opcode.equalsIgnoreCase(Opcodes.LEFT_INDEX.toString()))
{
+ throw new NotImplementedException();
+ }
+ else
+ throw new DMLRuntimeException(
+ "Invalid opcode (" + opcode + ") encountered in
MatrixIndexingOOCInstruction.");
+ }
+
+ private static class IndexedBlockMeta {
+ public final MatrixIndexes idx;
+ ////public final long nrows;
+ //public final long ncols;
+
+ public IndexedBlockMeta(IndexedMatrixValue mv) {
+ this.idx = mv.getIndexes();
+ //this.nrows = mv.getValue().getNumRows();
+ //this.ncols = mv.getValue().getNumColumns();
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java
index 5f97bd99e9..f136ffc2bb 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/SubscribableTaskQueue.java
@@ -26,7 +26,7 @@ public class SubscribableTaskQueue<T> extends
LocalTaskQueue<T> implements OOCSt
private Runnable _subscriber;
@Override
- public void enqueue(T t) {
+ public synchronized void enqueue(T t) {
try {
super.enqueueTask(t);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
index 08f00f86d2..d45d00db59 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
@@ -36,6 +36,12 @@ public class UnaryOOCInstruction extends
ComputationOOCInstruction {
_uop = op;
}
+ protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand
in1, CPOperand in2, CPOperand out, String opcode, String istr) {
+ super(type, op, in1, in2, out, opcode, istr);
+
+ _uop = op;
+ }
+
public static UnaryOOCInstruction parseInstruction(String str) {
String[] parts =
InstructionUtils.getInstructionPartsWithValueType(str);
InstructionUtils.checkNumFields(parts, 2);
diff --git a/src/main/java/org/apache/sysds/runtime/util/IndexRange.java
b/src/main/java/org/apache/sysds/runtime/util/IndexRange.java
index 4a8d999147..44fa8320e3 100644
--- a/src/main/java/org/apache/sysds/runtime/util/IndexRange.java
+++ b/src/main/java/org/apache/sysds/runtime/util/IndexRange.java
@@ -30,6 +30,10 @@ public class IndexRange implements Serializable
public long rowEnd = 0;
public long colStart = 0;
public long colEnd = 0;
+
+ public static IndexRange intersect(IndexRange a, IndexRange b) {
+ return new IndexRange(Math.max(a.rowStart, b.rowStart),
Math.min(a.rowEnd, b.rowEnd), Math.max(a.colStart, b.colStart),
Math.min(a.colEnd, b.colEnd));
+ }
public IndexRange(long rs, long re, long cs, long ce) {
set(rs, re, cs, ce);
@@ -52,6 +56,10 @@ public class IndexRange implements Serializable
colStart + delta, colEnd + delta);
}
+ public IndexRange add(long rowDelta, long colDelta) {
+ return new IndexRange(rowStart + rowDelta, rowEnd + rowDelta,
colStart + colDelta, colEnd + colDelta);
+ }
+
public boolean inColRange(long col) {
return col >= colStart && col < colEnd;
}
@@ -68,6 +76,10 @@ public class IndexRange implements Serializable
return rowEnd - rowStart;
}
+ public boolean isWithin(long row, long col) {
+ return inColRange(col) && inRowRange(row);
+ }
+
@Override
public String toString() {
return "["+rowStart+":"+rowEnd+","+colStart+":"+colEnd+"]";
diff --git
a/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java
b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java
new file mode 100644
index 0000000000..0dab096e0c
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingCoordsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class RightIndexingCoordsTest extends AutomatedTestBase {
+ private final static String TEST_NAME1 = "RightIndexingCoords";
+ private final static String TEST_DIR = "functions/ooc/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
RightIndexingCoordsTest.class.getSimpleName() + "/";
+ private final static double eps = 1e-8;
+ private static final String INPUT_NAME = "X";
+ private static final String OUTPUT_NAME = "res";
+
+ private final static int nrows = 2300;
+ private final static int ncols = 1200;
+ private final static int maxVal = 7;
+ private final static double sparsity1 = 1;
+ private final static double sparsity2 = 0.05;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ TestConfiguration config = new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
+ addTestConfiguration(TEST_NAME1, config);
+ }
+
+ @Test
+ public void testRightIndexingDense1() {
+ runRightIndexingTest(1, 1, false);
+ }
+
+ @Test
+ public void testRightIndexingSparse1() {
+ runRightIndexingTest(1, 1, true);
+ }
+
+ @Test
+ public void testRightIndexingDense2() {
+ runRightIndexingTest(1000, 1, false);
+ }
+
+ @Test
+ public void testRightIndexingSparse2() {
+ runRightIndexingTest(1000, 1, true);
+ }
+
+ private void runRightIndexingTest(int rs, int cs, boolean sparse) {
+ Types.ExecMode platformOld =
setExecMode(Types.ExecMode.SINGLE_NODE);
+
+ try {
+ getAndLoadTestConfiguration(TEST_NAME1);
+
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+ programArgs = new String[] {"-explain", "-stats",
"-ooc", "-args", input(INPUT_NAME), "" + rs, "" + cs, output(OUTPUT_NAME)};
+
+ // 1. Generate the data in-memory as MatrixBlock objects
+ double[][] X_data = getRandomMatrix(nrows, ncols, 1,
maxVal, sparse ? sparsity2 : sparsity1, 7);
+
+ // 2. Convert the double arrays to MatrixBlock objects
+ MatrixBlock X_mb =
DataConverter.convertToMatrixBlock(X_data);
+
+ // 3. Create a binary matrix writer
+ MatrixWriter writer =
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
+
+ // 4. Write matrix A to a binary SequenceFile
+ writer.writeMatrixToHDFS(X_mb, input(INPUT_NAME),
nrows, ncols, 1000, X_mb.getNonZeros());
+ HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"),
Types.ValueType.FP64,
+ new MatrixCharacteristics(nrows, ncols, 1000,
X_mb.getNonZeros()), Types.FileFormat.BINARY);
+
+ runTest(true, false, null, -1);
+
+ //check tsmm OOC
+ Assert.assertTrue("OOC wasn't used for multiplication",
+
heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.RIGHT_INDEX));
+
+ //compare results
+
+ // rerun without ooc flag
+ programArgs = new String[] {"-explain", "-stats",
"-args", input(INPUT_NAME), "" + rs, "" + cs, output(OUTPUT_NAME + "_target")};
+ runTest(true, false, null, -1);
+
+ // compare matrices
+
+ MatrixBlock ret1 =
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME),
+ Types.FileFormat.BINARY, 1, 1, 1000);
+ MatrixBlock ret2 =
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME + "_target"),
+ Types.FileFormat.BINARY, 1, 1, 1000);
+
+ //System.out.println(ret1.getNumRows() + "x" +
ret1.getNumColumns() + " <=> " + ret2.getNumRows() + "x" +
ret2.getNumColumns());
+ TestUtils.compareMatrices(ret2, ret1, eps);
+ }
+ catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ resetExecMode(platformOld);
+ }
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java
b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java
new file mode 100644
index 0000000000..5bd061c646
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/RightIndexingTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class RightIndexingTest extends AutomatedTestBase {
+ private final static String TEST_NAME1 = "RightIndexing";
+ private final static String TEST_DIR = "functions/ooc/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
RightIndexingTest.class.getSimpleName() + "/";
+ private final static double eps = 1e-8;
+ private static final String INPUT_NAME = "X";
+ private static final String OUTPUT_NAME = "res";
+
+ private final static int maxVal = 7;
+ private final static double sparsity1 = 1;
+ private final static double sparsity2 = 0.05;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ TestConfiguration config = new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
+ addTestConfiguration(TEST_NAME1, config);
+ }
+
+ @Test
+ public void testRightIndexingDense1() {
+ runRightIndexingTest(2, 2002, 100, 1150, 2100, 1200, false);
+ }
+
+ @Test
+ public void testRightIndexingSparse1() {
+ runRightIndexingTest(2, 2002, 100, 1150, 2100, 1200, true);
+ }
+
+ @Test
+ public void testRightIndexingAlignedDense() {
+ runRightIndexingTest(1, 2002, 1, 1150, 2100, 1200, false);
+ }
+
+ @Test
+ public void testRightIndexingAlignedSparse() {
+ runRightIndexingTest(1, 2002, 1, 1150, 2100, 1200, true);
+ }
+
+ @Test
+ public void testRightIndexingRowAlignedDense() {
+ runRightIndexingTest(1, 2002, 100, 1150, 2100, 1200, false);
+ }
+
+ @Test
+ public void testRightIndexingRowAlignedSparse() {
+ runRightIndexingTest(1, 2002, 100, 1150, 2100, 1200, true);
+ }
+
+ @Test
+ public void testRightIndexingSmallDense1() {
+ runRightIndexingTest(1, 700, 150, 1020, 3000, 3000, false);
+ }
+
+ @Test
+ public void testRightIndexingSmallSparse1() {
+ runRightIndexingTest(1, 700, 150, 1020, 3000, 3000, true);
+ }
+
+ @Test
+ public void testRightIndexingSmallDense2() {
+ runRightIndexingTest(150, 1020, 1, 700, 3000, 3000, false);
+ }
+
+ @Test
+ public void testRightIndexingSmallSparse2() {
+ runRightIndexingTest(150, 1020, 1, 700, 3000, 3000, true);
+ }
+
+ @Test
+ public void testRightIndexingSingleElementDense() {
+ runRightIndexingTest(1111, 1111, 2222, 2222, 3000, 3000, false);
+ }
+
+ @Test
+ public void testRightIndexingSingleElementSparse() {
+ runRightIndexingTest(1111, 1111, 2222, 2222, 3000, 3000, true);
+ }
+
+ @Test
+ public void testRightIndexingCrossBlockBothDense() {
+ runRightIndexingTest(950, 1050, 995, 1005, 3000, 3000, false);
+ }
+
+ @Test
+ public void testRightIndexingCrossBlockBothSparse() {
+ runRightIndexingTest(950, 1050, 995, 1005, 3000, 3000, true);
+ }
+
+ @Test
+ public void testRightIndexingSingleRowMultiBlockDense() {
+ runRightIndexingTest(1001, 1001, 800, 1205, 3000, 3000, false);
+ }
+
+ @Test
+ public void testRightIndexingSingleRowMultiBlockSparse() {
+ runRightIndexingTest(1001, 1001, 800, 1205, 3000, 3000, true);
+ }
+
+ @Test
+ public void testRightIndexingSingleColumnMultiBlockDense() {
+ runRightIndexingTest(800, 1205, 1001, 1001, 3000, 3000, false);
+ }
+
+ @Test
+ public void testRightIndexingSingleColumnMultiBlockSparse() {
+ runRightIndexingTest(800, 1205, 1001, 1001, 3000, 3000, true);
+ }
+
+ @Test
+ public void testRightIndexingTrailingBlocksDense() {
+ runRightIndexingTest(2501, 3000, 1500, 2100, 3000, 3000, false);
+ }
+
+ @Test
+ public void testRightIndexingTrailingBlocksSparse() {
+ runRightIndexingTest(2501, 3000, 1500, 2100, 3000, 3000, true);
+ }
+
+ private void runRightIndexingTest(int rs, int re, int cs, int ce, int
nrows, int ncols, boolean sparse) {
+ Types.ExecMode platformOld =
setExecMode(Types.ExecMode.SINGLE_NODE);
+
+ try {
+ getAndLoadTestConfiguration(TEST_NAME1);
+
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+ programArgs = new String[] {"-explain", "-stats",
"-ooc", "-args", input(INPUT_NAME), "" + rs, "" + re, "" + cs, "" + ce,
output(OUTPUT_NAME)};
+
+ // 1. Generate the data in-memory as MatrixBlock objects
+ double[][] X_data = getRandomMatrix(nrows, ncols, 1,
maxVal, sparse ? sparsity2 : sparsity1, 7);
+
+ // 2. Convert the double arrays to MatrixBlock objects
+ MatrixBlock X_mb =
DataConverter.convertToMatrixBlock(X_data);
+
+ // 3. Create a binary matrix writer
+ MatrixWriter writer =
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
+
+ // 4. Write matrix A to a binary SequenceFile
+ writer.writeMatrixToHDFS(X_mb, input(INPUT_NAME),
nrows, ncols, 1000, X_mb.getNonZeros());
+ HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"),
Types.ValueType.FP64,
+ new MatrixCharacteristics(nrows, ncols, 1000,
X_mb.getNonZeros()), Types.FileFormat.BINARY);
+
+ runTest(true, false, null, -1);
+
+ //check tsmm OOC
+ Assert.assertTrue("OOC wasn't used for multiplication",
+
heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.RIGHT_INDEX));
+
+ //compare results
+
+ // rerun without ooc flag
+ programArgs = new String[] {"-explain", "-stats",
"-args", input(INPUT_NAME), "" + rs, "" + re, "" + cs, "" + ce,
output(OUTPUT_NAME + "_target")};
+ runTest(true, false, null, -1);
+
+ // compare matrices
+ int outNRows = re-rs+1;
+ int outNCols = ce-cs+1;
+
+ MatrixBlock ret1 =
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME),
+ Types.FileFormat.BINARY, outNRows, outNCols,
1000);
+ MatrixBlock ret2 =
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME + "_target"),
+ Types.FileFormat.BINARY, outNRows, outNCols,
1000);
+
+ //System.out.println(ret1.getNumRows() + "x" +
ret1.getNumColumns() + " <=> " + ret2.getNumRows() + "x" +
ret2.getNumColumns());
+ /*System.out.println(ret1.slice(998, 1000, 901, 910));
+ System.out.println(ret2.slice(998, 1000, 901, 910));*/
+ TestUtils.compareMatrices(ret2, ret1, eps);
+ }
+ catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ resetExecMode(platformOld);
+ }
+ }
+}
diff --git a/src/test/scripts/functions/ooc/RightIndexing.dml
b/src/test/scripts/functions/ooc/RightIndexing.dml
new file mode 100644
index 0000000000..9abb5c6d93
--- /dev/null
+++ b/src/test/scripts/functions/ooc/RightIndexing.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+X = read($1);
+rl = $2;
+ru = $3;
+cl = $4;
+cu = $5;
+
+res = X[rl:ru, cl:cu];
+
+write(res, $6, format="binary");
diff --git a/src/test/scripts/functions/ooc/RightIndexingCoords.dml
b/src/test/scripts/functions/ooc/RightIndexingCoords.dml
new file mode 100644
index 0000000000..61a2ddc747
--- /dev/null
+++ b/src/test/scripts/functions/ooc/RightIndexingCoords.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+X = read($1);
+rs = $2;
+cs = $3;
+
+res = X[rs, cs];
+
+write(res, $4, format="binary");