http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java index c021c04..1953e99 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java @@ -51,7 +51,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.cp.DoubleObject; import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -237,8 +237,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction || WeightedCrossEntropy.OPCODE.equalsIgnoreCase(getOpcode()) || WeightedUnaryMM.OPCODE.equalsIgnoreCase(getOpcode())) { - PartitionedBroadcastMatrix bc1 = sec.getBroadcastForVariable( input2.getName() ); - PartitionedBroadcastMatrix bc2 = sec.getBroadcastForVariable( input3.getName() ); + PartitionedBroadcast<MatrixBlock> bc1 = sec.getBroadcastForVariable( input2.getName() ); + PartitionedBroadcast<MatrixBlock> bc2 = sec.getBroadcastForVariable( input3.getName() ); //partitioning-preserving mappartitions (key access required for broadcast loopkup) boolean noKeyChange = (qop.wtype3 == null || qop.wtype3.isBasic()); //only wdivmm changes keys @@ -251,8 +251,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction //reduce-side operation (two/three/four rdd inputs, zero/one/two broadcasts) else { - PartitionedBroadcastMatrix bc1 = _cacheU ? sec.getBroadcastForVariable( input2.getName() ) : null; - PartitionedBroadcastMatrix bc2 = _cacheV ? sec.getBroadcastForVariable( input3.getName() ) : null; + PartitionedBroadcast<MatrixBlock> bc1 = _cacheU ? sec.getBroadcastForVariable( input2.getName() ) : null; + PartitionedBroadcast<MatrixBlock> bc2 = _cacheV ? sec.getBroadcastForVariable( input3.getName() ) : null; JavaPairRDD<MatrixIndexes,MatrixBlock> inU = (!_cacheU) ? sec.getBinaryBlockRDDHandleForVariable( input2.getName() ) : null; JavaPairRDD<MatrixIndexes,MatrixBlock> inV = (!_cacheV) ? sec.getBinaryBlockRDDHandleForVariable( input3.getName() ) : null; JavaPairRDD<MatrixIndexes,MatrixBlock> inW = (qop.hasFourInputs() && !_input4.isLiteral()) ? @@ -361,10 +361,10 @@ public class QuaternarySPInstruction extends ComputationSPInstruction private static final long serialVersionUID = -3175397651350954930L; protected QuaternaryOperator _qop = null; - protected PartitionedBroadcastMatrix _pmU = null; - protected PartitionedBroadcastMatrix _pmV = null; + protected PartitionedBroadcast<MatrixBlock> _pmU = null; + protected PartitionedBroadcast<MatrixBlock> _pmV = null; - public RDDQuaternaryBaseFunction( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) { + public RDDQuaternaryBaseFunction( QuaternaryOperator qop, PartitionedBroadcast<MatrixBlock> bcU, PartitionedBroadcast<MatrixBlock> bcV ) { _qop = qop; _pmU = bcU; _pmV = bcV; @@ -388,7 +388,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction { private static final long serialVersionUID = -8209188316939435099L; - public RDDQuaternaryFunction1( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) + public RDDQuaternaryFunction1( QuaternaryOperator qop, PartitionedBroadcast<MatrixBlock> bcU, PartitionedBroadcast<MatrixBlock> bcV ) throws DMLRuntimeException { super(qop, bcU, bcV); @@ -415,8 +415,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction MatrixBlock blkIn = arg._2(); MatrixBlock blkOut = new MatrixBlock(); - MatrixBlock mbU = _pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1); - MatrixBlock mbV = _pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1); + MatrixBlock mbU = _pmU.getBlock((int)ixIn.getRowIndex(), 1); + MatrixBlock mbV = _pmV.getBlock((int)ixIn.getColumnIndex(), 1); //execute core operation blkIn.quaternaryOperations(_qop, mbU, mbV, null, blkOut); @@ -437,7 +437,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction { private static final long serialVersionUID = 7493974462943080693L; - public RDDQuaternaryFunction2( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) + public RDDQuaternaryFunction2( QuaternaryOperator qop, PartitionedBroadcast<MatrixBlock> bcU, PartitionedBroadcast<MatrixBlock> bcV ) throws DMLRuntimeException { super(qop, bcU, bcV); @@ -452,8 +452,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction MatrixBlock blkIn2 = arg0._2()._2(); MatrixBlock blkOut = new MatrixBlock(); - MatrixBlock mbU = (_pmU!=null)?_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2; - MatrixBlock mbV = (_pmV!=null)?_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1) : blkIn2; + MatrixBlock mbU = (_pmU!=null)?_pmU.getBlock((int)ixIn.getRowIndex(), 1) : blkIn2; + MatrixBlock mbV = (_pmV!=null)?_pmV.getBlock((int)ixIn.getColumnIndex(), 1) : blkIn2; MatrixBlock mbW = (_qop.hasFourInputs()) ? blkIn2 : null; //execute core operation @@ -473,7 +473,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction { private static final long serialVersionUID = -2294086455843773095L; - public RDDQuaternaryFunction3( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) + public RDDQuaternaryFunction3( QuaternaryOperator qop, PartitionedBroadcast<MatrixBlock> bcU, PartitionedBroadcast<MatrixBlock> bcV ) throws DMLRuntimeException { super(qop, bcU, bcV); @@ -490,8 +490,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction MatrixBlock blkOut = new MatrixBlock(); - MatrixBlock mbU = (_pmU!=null)?_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2; - MatrixBlock mbV = (_pmV!=null)?_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1) : + MatrixBlock mbU = (_pmU!=null)?_pmU.getBlock((int)ixIn.getRowIndex(), 1) : blkIn2; + MatrixBlock mbV = (_pmV!=null)?_pmV.getBlock((int)ixIn.getColumnIndex(), 1) : (_pmU!=null)? blkIn2 : blkIn3; MatrixBlock mbW = (_qop.hasFourInputs())? blkIn3 : null;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java index e2cb782..8974a7e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/UaggOuterChainSPInstruction.java @@ -42,7 +42,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceRow; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -158,7 +158,7 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction } else { - PartitionedBroadcastMatrix bv = sec.getBroadcastForVariable( bcastVar ); + PartitionedBroadcast<MatrixBlock> bv = sec.getBroadcastForVariable( bcastVar ); //partitioning-preserving map-to-pair (under constraints) out = in1.mapPartitionsToPair( new RDDMapGenUAggOuterChainFunction(bv, _uaggOp, _aggOp, _bOp, mcIn), noKeyChange ); @@ -314,7 +314,7 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction { private static final long serialVersionUID = 8197406787010296291L; - private PartitionedBroadcastMatrix _pbc = null; + private PartitionedBroadcast<MatrixBlock> _pbc = null; // Operators private AggregateUnaryOperator _uaggOp = null; @@ -327,7 +327,7 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction private MatrixValue _tmpVal1 = null; private MatrixValue _tmpVal2 = null; - public RDDMapGenUAggOuterChainFunction(PartitionedBroadcastMatrix binput, AggregateUnaryOperator uaggOp, AggregateOperator aggOp, BinaryOperator bOp, + public RDDMapGenUAggOuterChainFunction(PartitionedBroadcast<MatrixBlock> binput, AggregateUnaryOperator uaggOp, AggregateOperator aggOp, BinaryOperator bOp, MatrixCharacteristics mc) { _pbc = binput; @@ -374,7 +374,7 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction for(int bidx=1; bidx <= in2_colBlocks; bidx++) { - MatrixValue in2Val = _pbc.getMatrixBlock(1, bidx); + MatrixValue in2Val = _pbc.getBlock(1, bidx); //outer block operation OperationsOnMatrixValues.performBinaryIgnoreIndexes(in1Val, in2Val, _tmpVal1, _bOp); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java index f42c29e..0cfec66 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java @@ -146,7 +146,7 @@ public class WriteSPInstruction extends SPInstruction //prepare output info according to meta data String outFmt = input3.getName(); OutputInfo oi = OutputInfo.stringToOutputInfo(outFmt); - + //core matrix/frame write if( input1.getDataType()==DataType.MATRIX ) processMatrixWriteInstruction(sec, fname, oi); @@ -311,7 +311,7 @@ public class WriteSPInstruction extends SPInstruction } // write meta data file - MapReduceTool.writeMetaDataFile(fname + ".mtd", input1.getValueType(), DataType.FRAME, mc, oi, formatProperties); + MapReduceTool.writeMetaDataFile(fname + ".mtd", input1.getValueType(), null, DataType.FRAME, mc, oi, formatProperties); } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java index 13c68e2..6091edf 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java @@ -22,15 +22,16 @@ package org.apache.sysml.runtime.instructions.spark.data; import java.lang.ref.SoftReference; import org.apache.spark.broadcast.Broadcast; +import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; -public class BroadcastObject extends LineageObject +public class BroadcastObject<T extends CacheBlock> extends LineageObject { //soft reference storage for graceful cleanup in case of memory pressure - private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null; + protected SoftReference<PartitionedBroadcast<T>> _bcHandle = null; - public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName ) + public BroadcastObject( PartitionedBroadcast<T> bvar, String varName ) { - _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar); + _bcHandle = new SoftReference<PartitionedBroadcast<T>>(bvar); _varName = varName; } @@ -38,7 +39,8 @@ public class BroadcastObject extends LineageObject * * @return */ - public PartitionedBroadcastMatrix getBroadcast() + @SuppressWarnings("rawtypes") + public PartitionedBroadcast getBroadcast() { return _bcHandle.get(); } @@ -50,13 +52,13 @@ public class BroadcastObject extends LineageObject public boolean isValid() { //check for evicted soft reference - PartitionedBroadcastMatrix pbm = _bcHandle.get(); + PartitionedBroadcast<T> pbm = _bcHandle.get(); if( pbm == null ) return false; //check for validity of individual broadcasts - Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts(); - for( Broadcast<PartitionedMatrixBlock> bc : tmp ) + Broadcast<PartitionedBlock<T>>[] tmp = pbm.getBroadcasts(); + for( Broadcast<PartitionedBlock<T>> bc : tmp ) if( !bc.isValid() ) return false; return true; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java new file mode 100644 index 0000000..20fcd0b --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.lang.reflect.Constructor; +import java.util.ArrayList; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; +import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.util.FastBufferedDataInputStream; +import org.apache.sysml.runtime.util.FastBufferedDataOutputStream; +import org.apache.sysml.runtime.util.IndexRange; + +/** + * This class is for partitioned matrix/frame blocks, to be used + * as broadcasts. Distributed tasks require block-partitioned broadcasts but a lazy partitioning per + * task would create instance-local copies and hence replicate broadcast variables which are shared + * by all tasks within an executor. + * + */ +public class PartitionedBlock<T extends CacheBlock> implements Externalizable +{ + + protected T[] _partBlocks = null; + protected long _rlen = -1; + protected long _clen = -1; + protected int _brlen = -1; + protected int _bclen = -1; + protected int _offset = 0; + + public PartitionedBlock() { + //do nothing (required for Externalizable) + } + + + public long getNumRows() { + return _rlen; + } + + public long getNumCols() { + return _clen; + } + + public long getNumRowsPerBlock() { + return _brlen; + } + + public long getNumColumnsPerBlock() { + return _bclen; + } + + /** + * + * @return + */ + public int getNumRowBlocks() + { + return (int)Math.ceil((double)_rlen/_brlen); + } + + /** + * + * @return + */ + public int getNumColumnBlocks() + { + return (int)Math.ceil((double)_clen/_bclen); + } + + @SuppressWarnings("unchecked") + public PartitionedBlock(T block, int brlen, int bclen) + { + //get the input frame block + int rlen = block.getNumRows(); + int clen = block.getNumColumns(); + + //partitioning input broadcast + _rlen = rlen; + _clen = clen; + _brlen = brlen; + _bclen = bclen; + + int nrblks = getNumRowBlocks(); + int ncblks = getNumColumnBlocks(); + + try + { + _partBlocks = (T[])Array.newInstance((block.getClass()), nrblks * ncblks); + for( int i=0, ix=0; i<nrblks; i++ ) + for( int j=0; j<ncblks; j++, ix++ ) + { + T tmp = (T) block.getClass().newInstance(); + block.sliceOperations(i*_brlen, Math.min((i+1)*_brlen, rlen)-1, + j*_bclen, Math.min((j+1)*_bclen, clen)-1, tmp); + _partBlocks[ix] = tmp; + } + } + catch(Exception ex) { + throw new RuntimeException("Failed partitioning of broadcast variable input.", ex); + } + + _offset = 0; + } + + @SuppressWarnings("unchecked") + public PartitionedBlock(int rlen, int clen, int brlen, int bclen, T block) + { + //partitioning input broadcast + _rlen = rlen; + _clen = clen; + _brlen = brlen; + _bclen = bclen; + + int nrblks = getNumRowBlocks(); + int ncblks = getNumColumnBlocks(); + _partBlocks = (T[])Array.newInstance((block.getClass()), nrblks * ncblks); + + } + + /** + * + * @param rowIndex + * @param colIndex + * @return + * @throws DMLRuntimeException + */ + public T getBlock(int rowIndex, int colIndex) + throws DMLRuntimeException + { + //check for valid block index + int nrblks = getNumRowBlocks(); + int ncblks = getNumColumnBlocks(); + if( rowIndex <= 0 || rowIndex > nrblks || colIndex <= 0 || colIndex > ncblks ) { + throw new DMLRuntimeException("Block indexes ["+rowIndex+","+colIndex+"] out of range ["+nrblks+","+ncblks+"]"); + } + + //get the requested frame/matrix block + int rix = rowIndex - 1; + int cix = colIndex - 1; + int ix = rix*ncblks+cix - _offset; + return _partBlocks[ix]; + } + + /** + * + * @param rowIndex + * @param colIndex + * @param mb + * @throws DMLRuntimeException + */ + public void setBlock(int rowIndex, int colIndex, T block) + throws DMLRuntimeException + { + //check for valid block index + int nrblks = getNumRowBlocks(); + int ncblks = getNumColumnBlocks(); + if( rowIndex <= 0 || rowIndex > nrblks || colIndex <= 0 || colIndex > ncblks ) { + throw new DMLRuntimeException("Block indexes ["+rowIndex+","+colIndex+"] out of range ["+nrblks+","+ncblks+"]"); + } + + //get the requested matrix block + int rix = rowIndex - 1; + int cix = colIndex - 1; + int ix = rix*ncblks+cix - _offset; + _partBlocks[ ix ] = block; + + } + + /** + * + * @param offset + * @param numBlks + * @return + */ + @SuppressWarnings("unchecked") + public PartitionedBlock<T> createPartition( int offset, int numBlks, T block ) + { + PartitionedBlock<T> ret = new PartitionedBlock<T>(); + ret._rlen = _rlen; + ret._clen = _clen; + ret._brlen = _brlen; + ret._bclen = _bclen; + + _partBlocks = (T[])Array.newInstance(block.getClass(), numBlks); + ret._offset = offset; + System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, numBlks); + + return ret; + } + + /** + * + * @return + */ + public long getInMemorySize() + { + long ret = 24; //header + ret += 32; //block array + + if( _partBlocks != null ) + for( T block : _partBlocks ) + ret += block.getInMemorySize(); + + return ret; + } + + /** + * + * @return + */ + + public long getExactSerializedSize() + { + long ret = 24; //header + + if( _partBlocks != null ) + for( T block : _partBlocks ) + ret += block.getExactSerializedSize(); + + return ret; + } + + /** + * Utility for slice operations over partitioned matrices, where the index range can cover + * multiple blocks. The result is always a single result matrix block. All semantics are + * equivalent to the core matrix block slice operations. + * + * @param rl + * @param ru + * @param cl + * @param cu + * @param matrixBlock + * @return + * @throws DMLRuntimeException + */ + @SuppressWarnings("unchecked") + public T sliceOperations(long rl, long ru, long cl, long cu, T block) + throws DMLRuntimeException + { + int lrl = (int) rl; + int lru = (int) ru; + int lcl = (int) cl; + int lcu = (int) cu; + + ArrayList<Pair<?, ?>> allBlks = block.getPairList(); + int start_iix = (lrl-1)/_brlen+1; + int end_iix = (lru-1)/_brlen+1; + int start_jix = (lcl-1)/_bclen+1; + int end_jix = (lcu-1)/_bclen+1; + + for( int iix = start_iix; iix <= end_iix; iix++ ) + for(int jix = start_jix; jix <= end_jix; jix++) + { + T in = getBlock(iix, jix); + IndexRange ixrange = new IndexRange(rl, ru, cl, cu); + ArrayList<Pair<?, ?>> outlist = block.performSlice(ixrange, _brlen, _bclen, iix, jix, in); + allBlks.addAll(outlist); + } + + if(allBlks.size() == 1) { + return (T) allBlks.get(0).getValue(); + } + else { + //allocate output matrix + Constructor<?> constr; + try { + constr = block.getClass().getConstructor(int.class, int.class, boolean.class); + T ret = (T) constr.newInstance(lru-lrl+1, lcu-lcl+1, false); + for(Pair<?, ?> kv : allBlks) { + ret.merge((T)kv.getValue(), false); + } + return ret; + } catch (Exception e) { + throw new DMLRuntimeException(e); + } + } + } + + /** + * Redirects the default java serialization via externalizable to our default + * hadoop writable serialization for efficient broadcast deserialization. + * + * @param is + * @throws IOException + */ + public void readExternal(ObjectInput is) + throws IOException + { + DataInput dis = is; + + if( is instanceof ObjectInputStream ) { + //fast deserialize of dense/sparse blocks + ObjectInputStream ois = (ObjectInputStream)is; + dis = new FastBufferedDataInputStream(ois); + } + + readHeaderAndPayload(dis); + } + + /** + * Redirects the default java serialization via externalizable to our default + * hadoop writable serialization for efficient broadcast serialization. + * + * @param is + * @throws IOException + */ + public void writeExternal(ObjectOutput os) + throws IOException + { + if( os instanceof ObjectOutputStream ) { + //fast serialize of dense/sparse blocks + ObjectOutputStream oos = (ObjectOutputStream)os; + FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos); + writeHeaderAndPayload(fos); + fos.flush(); + } + else { + //default serialize (general case) + writeHeaderAndPayload(os); + } + } + + /** + * + * @param dos + * @throws IOException + */ + private void writeHeaderAndPayload(DataOutput dos) + throws IOException + { + dos.writeLong(_rlen); + dos.writeLong(_clen); + dos.writeInt(_brlen); + dos.writeInt(_bclen); + dos.writeInt(_offset); + dos.writeInt(_partBlocks.length); + + for( T block : _partBlocks ) + block.write(dos); + } + + /** + * + * @param din + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void readHeaderAndPayload(DataInput dis) + throws IOException + { + _rlen = dis.readInt(); + _clen = dis.readInt(); + _brlen = dis.readInt(); + _bclen = dis.readInt(); + _offset = dis.readInt(); + + int len = dis.readInt(); + + try + { + _partBlocks = (T[])Array.newInstance(getClass(), len); + for( int i=0; i<len; i++ ) { + _partBlocks[i].readFields(dis); + } + } + catch(Exception ex) { + throw new RuntimeException("Failed partitioning of broadcast variable input.", ex); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java new file mode 100644 index 0000000..d924cc0 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark.data; + +import java.io.Serializable; + +import org.apache.spark.broadcast.Broadcast; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; + +/** + * This class is a wrapper around an array of broadcasts of partitioned matrix/frame blocks, + * which is required due to 2GB limitations of Spark's broadcast handling. Without this + * partitioning of Broadcast<PartitionedBlock> into Broadcast<PartitionedBlock>[], + * we got java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE issue. + * Despite various jiras, this issue still showed up in Spark 1.4/1.5. + * + */ +public class PartitionedBroadcast<T extends CacheBlock> implements Serializable +{ + private static final long serialVersionUID = 7041959166079438401L; + + protected static final long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB + + private Broadcast<PartitionedBlock<T>>[] _pbc = null; + + public PartitionedBroadcast() { + //do nothing (required for Externalizable) + } + + public PartitionedBroadcast(Broadcast<PartitionedBlock<T>>[] broadcasts) + { + _pbc = broadcasts; + } + + public Broadcast<PartitionedBlock<T>>[] getBroadcasts() { + return _pbc; + } + + /** + * + * @return + */ + public int getNumRowBlocks() { + return _pbc[0].value().getNumRowBlocks(); + } + + public int getNumColumnBlocks() { + return _pbc[0].value().getNumColumnBlocks(); + } + + /** + * + * @param rlen + * @param clen + * @param brlen + * @param bclen + * @return + */ + public static int computeBlocksPerPartition(long rlen, long clen, long brlen, long bclen) { + return (int) Math.floor( BROADCAST_PARTSIZE / + Math.min(rlen, brlen) / Math.min(clen, bclen)); + } + /** + * + * @param rowIndex + * @param colIndex + * @return + * @throws DMLRuntimeException + */ + public T getBlock(int rowIndex, int colIndex) + throws DMLRuntimeException + { + int pix = 0; + + if( _pbc.length > 1 ) { + //compute partition index + PartitionedBlock<T> tmp = _pbc[0].value(); + int numPerPart = computeBlocksPerPartition(tmp.getNumRows(), tmp.getNumCols(), + tmp.getNumRowsPerBlock(), tmp.getNumColumnsPerBlock()); + int ix = (rowIndex-1)*tmp.getNumColumnBlocks()+(colIndex-1); + pix = ix / numPerPart; + } + + return _pbc[pix].value().getBlock(rowIndex, colIndex); + } + + public T sliceOperations(long rl, long ru, long cl, long cu, T block) + throws DMLRuntimeException + { + T ret = null; + + for( Broadcast<PartitionedBlock<T>> bc : _pbc ) { + PartitionedBlock<T> pm = bc.value(); + T tmp = pm.sliceOperations(rl, ru, cl, cu, block); + if( ret != null ) + ret.merge(tmp, false); + else + ret = tmp; + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java deleted file mode 100644 index f79d8c1..0000000 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.instructions.spark.data; - -import java.io.Serializable; - -import org.apache.spark.broadcast.Broadcast; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; - -/** - * This class is a wrapper around an array of broadcasts of partitioned matrix blocks, - * which is required due to 2GB limitations of Spark's broadcast handling. Without this - * partitioning of Broadcast<PartitionedMatrixBlock> into Broadcast<PartitionedMatrixBlock>[], - * we got java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE issue. - * Despite various jiras, this issue still showed up in Spark 1.4/1.5. - * - */ -public class PartitionedBroadcastMatrix implements Serializable -{ - private static final long serialVersionUID = 1225135967889810877L; - private static final long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB - - private Broadcast<PartitionedMatrixBlock>[] _pbc = null; - - public PartitionedBroadcastMatrix(Broadcast<PartitionedMatrixBlock>[] broadcasts) - { - _pbc = broadcasts; - } - - public Broadcast<PartitionedMatrixBlock>[] getBroadcasts() { - return _pbc; - } - - /** - * - * @return - */ - public int getNumRowBlocks() { - return _pbc[0].value().getNumRowBlocks(); - } - - public int getNumColumnBlocks() { - return _pbc[0].value().getNumColumnBlocks(); - } - - /** - * - * @param rowIndex - * @param colIndex - * @return - * @throws DMLRuntimeException - */ - public MatrixBlock getMatrixBlock(int rowIndex, int colIndex) - throws DMLRuntimeException - { - if( _pbc.length > 1 ) { - //compute partition index - PartitionedMatrixBlock tmp = _pbc[0].value(); - int numPerPart = computeBlocksPerPartition(tmp.getNumRows(), tmp.getNumCols(), - tmp.getNumRowsPerBlock(), tmp.getNumColumnsPerBlock()); - int ix = (rowIndex-1)*tmp.getNumColumnBlocks()+(colIndex-1); - int pix = ix / numPerPart; - - //get matrix block from partition - return _pbc[pix].value().getMatrixBlock(rowIndex, colIndex); - } - else { //single partition - return _pbc[0].value().getMatrixBlock(rowIndex, colIndex); - } - - } - - public MatrixBlock sliceOperations(long rl, long ru, long cl, long cu, MatrixBlock matrixBlock) - throws DMLRuntimeException - { - MatrixBlock ret = null; - - for( Broadcast<PartitionedMatrixBlock> bc : _pbc ) { - PartitionedMatrixBlock pm = bc.value(); - MatrixBlock tmp = pm.sliceOperations(rl, ru, cl, cu, new MatrixBlock()); - if( ret != null ) - ret.merge(tmp, false); - else - ret = tmp; - } - - return ret; - } - - /** - * - * @param rlen - * @param clen - * @param brlen - * @param bclen - * @return - */ - public static int computeBlocksPerPartition(long rlen, long clen, long brlen, long bclen) { - return (int) Math.floor( BROADCAST_PARTSIZE / - Math.min(rlen, brlen) / Math.min(clen, bclen)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedMatrixBlock.java deleted file mode 100644 index 901b130..0000000 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedMatrixBlock.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.instructions.spark.data; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.util.ArrayList; - -import scala.Tuple2; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; -import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; -import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; -import org.apache.sysml.runtime.util.FastBufferedDataInputStream; -import org.apache.sysml.runtime.util.FastBufferedDataOutputStream; -import org.apache.sysml.runtime.util.IndexRange; - -/** - * The main purpose of this class is to provide a handle for partitioned matrix blocks, to be used - * as broadcasts. Distributed tasks require block-partitioned broadcasts but a lazy partitioning per - * task would create instance-local copies and hence replicate broadcast variables which are shared - * by all tasks within an executor. - * - */ -public class PartitionedMatrixBlock implements Externalizable -{ - - private static final long serialVersionUID = -5706923809800365593L; - - private MatrixBlock[] _partBlocks = null; - private int _rlen = -1; - private int _clen = -1; - private int _brlen = -1; - private int _bclen = -1; - private int _offset = 0; - - public PartitionedMatrixBlock() { - //do nothing (required for Externalizable) - } - - public PartitionedMatrixBlock(MatrixBlock mb, int brlen, int bclen) - { - //get the input matrix block - int rlen = mb.getNumRows(); - int clen = mb.getNumColumns(); - - //partitioning input broadcast - _rlen = rlen; - _clen = clen; - _brlen = brlen; - _bclen = bclen; - - int nrblks = getNumRowBlocks(); - int ncblks = getNumColumnBlocks(); - _partBlocks = new MatrixBlock[nrblks * ncblks]; - - try - { - for( int i=0, ix=0; i<nrblks; i++ ) - for( int j=0; j<ncblks; j++, ix++ ) - { - MatrixBlock tmp = new MatrixBlock(); - mb.sliceOperations(i*brlen, Math.min((i+1)*brlen, rlen)-1, - j*bclen, Math.min((j+1)*bclen, clen)-1, tmp); - _partBlocks[ix] = tmp; - } - } - catch(Exception ex) { - throw new RuntimeException("Failed partitioning of broadcast variable input.", ex); - } - - _offset = 0; - } - - public PartitionedMatrixBlock(int rlen, int clen, int brlen, int bclen) - { - //partitioning input broadcast - _rlen = rlen; - _clen = clen; - _brlen = brlen; - _bclen = bclen; - - int nrblks = getNumRowBlocks(); - int ncblks = getNumColumnBlocks(); - _partBlocks = new MatrixBlock[nrblks * ncblks]; - } - - public long getNumRows() { - return _rlen; - } - - public long getNumCols() { - return _clen; - } - - public long getNumRowsPerBlock() { - return _brlen; - } - - public long getNumColumnsPerBlock() { - return _bclen; - } - - /** - * - * @return - */ - public int getNumRowBlocks() - { - return (int)Math.ceil((double)_rlen/_brlen); - } - - /** - * - * @return - */ - public int getNumColumnBlocks() - { - return (int)Math.ceil((double)_clen/_bclen); - } - - /** - * - * @param rowIndex - * @param colIndex - * @return - * @throws DMLRuntimeException - */ - public MatrixBlock getMatrixBlock(int rowIndex, int colIndex) - throws DMLRuntimeException - { - //check for valid block index - int nrblks = getNumRowBlocks(); - int ncblks = getNumColumnBlocks(); - if( rowIndex <= 0 || rowIndex > nrblks || colIndex <= 0 || colIndex > ncblks ) { - throw new DMLRuntimeException("Block indexes ["+rowIndex+","+colIndex+"] out of range ["+nrblks+","+ncblks+"]"); - } - - //get the requested matrix block - int rix = rowIndex - 1; - int cix = colIndex - 1; - int ix = rix*ncblks+cix - _offset; - return _partBlocks[ ix ]; - } - - /** - * - * @param rowIndex - * @param colIndex - * @param mb - * @throws DMLRuntimeException - */ - public void setMatrixBlock(int rowIndex, int colIndex, MatrixBlock mb) - throws DMLRuntimeException - { - //check for valid block index - int nrblks = getNumRowBlocks(); - int ncblks = getNumColumnBlocks(); - if( rowIndex <= 0 || rowIndex > nrblks || colIndex <= 0 || colIndex > ncblks ) { - throw new DMLRuntimeException("Block indexes ["+rowIndex+","+colIndex+"] out of range ["+nrblks+","+ncblks+"]"); - } - - //get the requested matrix block - int rix = rowIndex - 1; - int cix = colIndex - 1; - int ix = rix*ncblks+cix - _offset; - _partBlocks[ ix ] = mb; - - } - - /** - * - * @return - */ - public long estimateSizeInMemory() - { - long ret = 24; //header - ret += 32; //block array - - if( _partBlocks != null ) - for( MatrixBlock mb : _partBlocks ) - ret += mb.estimateSizeInMemory(); - - return ret; - } - - /** - * - * @return - */ - public long estimateSizeOnDisk() - { - long ret = 24; //header - - if( _partBlocks != null ) - for( MatrixBlock mb : _partBlocks ) - ret += mb.estimateSizeOnDisk(); - - return ret; - } - - /** - * - * @param offset - * @param numBlks - * @return - */ - public PartitionedMatrixBlock createPartition( int offset, int numBlks ) - { - PartitionedMatrixBlock ret = new PartitionedMatrixBlock(); - ret._rlen = _rlen; - ret._clen = _clen; - ret._brlen = _brlen; - ret._bclen = _bclen; - ret._partBlocks = new MatrixBlock[numBlks]; - ret._offset = offset; - - System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, numBlks); - - return ret; - } - - /** - * Utility for slice operations over partitioned matrices, where the index range can cover - * multiple blocks. The result is always a single result matrix block. All semantics are - * equivalent to the core matrix block slice operations. - * - * @param rl - * @param ru - * @param cl - * @param cu - * @param matrixBlock - * @return - * @throws DMLRuntimeException - */ - public MatrixBlock sliceOperations(long rl, long ru, long cl, long cu, MatrixBlock matrixBlock) - throws DMLRuntimeException - { - int lrl = (int) rl; - int lru = (int) ru; - int lcl = (int) cl; - int lcu = (int) cu; - - ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> allBlks = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); - int start_iix = (lrl-1)/_brlen+1; - int end_iix = (lru-1)/_brlen+1; - int start_jix = (lcl-1)/_bclen+1; - int end_jix = (lcu-1)/_bclen+1; - - for( int iix = start_iix; iix <= end_iix; iix++ ) - for(int jix = start_jix; jix <= end_jix; jix++) - { - MatrixBlock in = getMatrixBlock(iix, jix); - IndexedMatrixValue imv = new IndexedMatrixValue(new MatrixIndexes(iix, jix), in); - - ArrayList<IndexedMatrixValue> outlist = new ArrayList<IndexedMatrixValue>(); - IndexRange ixrange = new IndexRange(rl, ru, cl, cu); - OperationsOnMatrixValues.performSlice(imv, ixrange, _brlen, _bclen, outlist); - allBlks.addAll(SparkUtils.fromIndexedMatrixBlock(outlist)); - } - - if(allBlks.size() == 1) { - return allBlks.get(0)._2; - } - else { - //allocate output matrix - MatrixBlock ret = new MatrixBlock(lru-lrl+1, lcu-lcl+1, false); - for(Tuple2<MatrixIndexes, MatrixBlock> kv : allBlks) { - ret.merge(kv._2, false); - } - return ret; - } - } - - /** - * Redirects the default java serialization via externalizable to our default - * hadoop writable serialization for efficient broadcast deserialization. - * - * @param is - * @throws IOException - */ - public void readExternal(ObjectInput is) - throws IOException - { - DataInput dis = is; - - if( is instanceof ObjectInputStream ) { - //fast deserialize of dense/sparse blocks - ObjectInputStream ois = (ObjectInputStream)is; - dis = new FastBufferedDataInputStream(ois); - } - - readHeaderAndPayload(dis); - } - - /** - * Redirects the default java serialization via externalizable to our default - * hadoop writable serialization for efficient broadcast serialization. - * - * @param is - * @throws IOException - */ - public void writeExternal(ObjectOutput os) - throws IOException - { - if( os instanceof ObjectOutputStream ) { - //fast serialize of dense/sparse blocks - ObjectOutputStream oos = (ObjectOutputStream)os; - FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos); - writeHeaderAndPayload(fos); - fos.flush(); - } - else { - //default serialize (general case) - writeHeaderAndPayload(os); - } - } - - /** - * - * @param dos - * @throws IOException - */ - private void writeHeaderAndPayload(DataOutput dos) - throws IOException - { - dos.writeInt(_rlen); - dos.writeInt(_clen); - dos.writeInt(_brlen); - dos.writeInt(_bclen); - dos.writeInt(_offset); - dos.writeInt(_partBlocks.length); - for( MatrixBlock mb : _partBlocks ) - mb.write(dos); - } - - /** - * - * @param din - * @throws IOException - */ - private void readHeaderAndPayload(DataInput dis) - throws IOException - { - _rlen = dis.readInt(); - _clen = dis.readInt(); - _brlen = dis.readInt(); - _bclen = dis.readInt(); - _offset = dis.readInt(); - - int len = dis.readInt(); - _partBlocks = new MatrixBlock[len]; - - for( int i=0; i<len; i++ ) { - _partBlocks[i] = new MatrixBlock(); - _partBlocks[i].readFields(dis); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java index 9e31878..df891f7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyFrameBlockPairFunction.java @@ -57,4 +57,4 @@ public class CopyFrameBlockPairFunction implements PairFunction<Tuple2<LongWrita return new Tuple2<Long,FrameBlock>(arg0._1().get(), arg0._2()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java index e8c6dbe..2a3aedf 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java @@ -27,7 +27,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; import org.apache.sysml.hops.OptimizerUtils; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.WeightedCell; @@ -139,9 +139,9 @@ public abstract class ExtractGroup implements Serializable { private static final long serialVersionUID = 5709955602290131093L; - private PartitionedBroadcastMatrix _pbm = null; + private PartitionedBroadcast<MatrixBlock> _pbm = null; - public ExtractGroupBroadcast( PartitionedBroadcastMatrix pbm, long bclen, long ngroups, Operator op ) { + public ExtractGroupBroadcast( PartitionedBroadcast<MatrixBlock> pbm, long bclen, long ngroups, Operator op ) { super(bclen, ngroups, op); _pbm = pbm; } @@ -152,7 +152,7 @@ public abstract class ExtractGroup implements Serializable throws Exception { MatrixIndexes ix = arg._1; - MatrixBlock group = _pbm.getMatrixBlock((int)ix.getRowIndex(), 1); + MatrixBlock group = _pbm.getBlock((int)ix.getRowIndex(), 1); MatrixBlock target = arg._2; return execute(ix, group, target); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java index b110861..bc6035d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpFunction.java @@ -25,7 +25,7 @@ import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; import org.apache.sysml.lops.BinaryM.VectorType; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedMatrixBlock; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; @@ -36,10 +36,10 @@ public class MatrixVectorBinaryOpFunction implements PairFunction<Tuple2<MatrixI private static final long serialVersionUID = -7695883019452417300L; private BinaryOperator _op = null; - private Broadcast<PartitionedMatrixBlock> _pmV = null; + private Broadcast<PartitionedBlock<MatrixBlock>> _pmV = null; private VectorType _vtype = null; - public MatrixVectorBinaryOpFunction( BinaryOperator op, Broadcast<PartitionedMatrixBlock> binput, VectorType vtype ) + public MatrixVectorBinaryOpFunction( BinaryOperator op, Broadcast<PartitionedBlock<MatrixBlock>> binput, VectorType vtype ) { _op = op; _pmV = binput; @@ -56,7 +56,7 @@ public class MatrixVectorBinaryOpFunction implements PairFunction<Tuple2<MatrixI //get the rhs block int rix= (int)((_vtype==VectorType.COL_VECTOR) ? ix.getRowIndex() : 1); int cix= (int)((_vtype==VectorType.COL_VECTOR) ? 1 : ix.getColumnIndex()); - MatrixBlock in2 = _pmV.value().getMatrixBlock(rix, cix); + MatrixBlock in2 = _pmV.value().getBlock(rix, cix); //execute the binary operation MatrixBlock ret = (MatrixBlock) (in1.binaryOperations (_op, in2, new MatrixBlock())); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java index b298693..597c028 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java @@ -27,7 +27,7 @@ import scala.Tuple2; import org.apache.sysml.lops.BinaryM.VectorType; import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; @@ -37,10 +37,10 @@ public class MatrixVectorBinaryOpPartitionFunction implements PairFlatMapFunctio private static final long serialVersionUID = 9096091404578628534L; private BinaryOperator _op = null; - private PartitionedBroadcastMatrix _pmV = null; + private PartitionedBroadcast<MatrixBlock> _pmV = null; private VectorType _vtype = null; - public MatrixVectorBinaryOpPartitionFunction( BinaryOperator op, PartitionedBroadcastMatrix binput, VectorType vtype ) + public MatrixVectorBinaryOpPartitionFunction( BinaryOperator op, PartitionedBroadcast<MatrixBlock> binput, VectorType vtype ) { _op = op; _pmV = binput; @@ -76,7 +76,7 @@ public class MatrixVectorBinaryOpPartitionFunction implements PairFlatMapFunctio //get the rhs block int rix= (int)((_vtype==VectorType.COL_VECTOR) ? ix.getRowIndex() : 1); int cix= (int)((_vtype==VectorType.COL_VECTOR) ? 1 : ix.getColumnIndex()); - MatrixBlock in2 = _pmV.getMatrixBlock(rix, cix); + MatrixBlock in2 = _pmV.getBlock(rix, cix); //execute the binary operation MatrixBlock ret = (MatrixBlock) (in1.binaryOperations (_op, in2, new MatrixBlock())); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java index 2947c72..3b90ff3 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java @@ -25,7 +25,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; @@ -35,9 +35,9 @@ public class OuterVectorBinaryOpFunction implements PairFlatMapFunction<Tuple2<M private static final long serialVersionUID = 1730704346934726826L; private BinaryOperator _op; - private PartitionedBroadcastMatrix _pmV; + private PartitionedBroadcast<MatrixBlock> _pmV; - public OuterVectorBinaryOpFunction( BinaryOperator op, PartitionedBroadcastMatrix binput ) + public OuterVectorBinaryOpFunction( BinaryOperator op, PartitionedBroadcast<MatrixBlock> binput ) { _op = op; _pmV = binput; @@ -84,7 +84,7 @@ public class OuterVectorBinaryOpFunction implements PairFlatMapFunction<Tuple2<M MatrixIndexes ix = _currBlk._1(); MatrixBlock in1 = _currBlk._2(); - MatrixBlock in2 = _pmV.getMatrixBlock(1, _currPos); + MatrixBlock in2 = _pmV.getBlock(1, _currPos); MatrixBlock resultBlk = (MatrixBlock)in1.binaryOperations (_op, in2, new MatrixBlock()); resultBlk.examSparsity(); ret = new Tuple2<MatrixIndexes,MatrixBlock>( http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 274efe0..b63724a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -178,7 +178,6 @@ public class FrameRDDConverterUtils return textCellToBinaryBlockLongIndex(sc, input, mcOut, lschema); } - /** * * @param sc @@ -198,7 +197,7 @@ public class FrameRDDConverterUtils //aggregate partial matrix blocks JavaPairRDD<Long,FrameBlock> out = - RDDAggregateUtils.mergeByFrameKey( output ); + (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( output ); return out; } @@ -264,7 +263,7 @@ public class FrameRDDConverterUtils //aggregate partial frame blocks if(mcIn.getCols() > mcIn.getColsPerBlock()) - out = RDDAggregateUtils.mergeByFrameKey( out ); //TODO: Will need better merger + out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( out ); } else out = input.mapToPair(new MatrixToBinaryBlockOneColumnBlockFunction(mcIn)); @@ -396,8 +395,6 @@ public class FrameRDDConverterUtils private boolean _fill = false; private int _maxRowsPerBlock = -1; - protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, size of default matrix block - public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean hasHeader, String delim, boolean fill) { @@ -405,7 +402,7 @@ public class FrameRDDConverterUtils _hasHeader = hasHeader; _delim = delim; _fill = fill; - _maxRowsPerBlock = Math.max((int) (BUFFER_SIZE/_clen), 1); + _maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1); } @Override @@ -529,10 +526,7 @@ public class FrameRDDConverterUtils { private static final long serialVersionUID = -729614449626680946L; - //internal buffer size (aligned w/ default matrix block size) - protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements (8MB), size of default matrix block protected int _bufflen = -1; - protected long _rlen = -1; protected long _clen = -1; @@ -542,7 +536,7 @@ public class FrameRDDConverterUtils _clen = mc.getCols(); //determine upper bounded buffer len - _bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE); + _bufflen = (int) Math.min(_rlen*_clen, FrameBlock.BUFFER_SIZE); } @@ -624,15 +618,12 @@ public class FrameRDDConverterUtils private int _maxRowsPerBlock = -1; - protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements (Default matrix block size) - - public MatrixToBinaryBlockFunction(MatrixCharacteristics mc) { _brlen = mc.getRowsPerBlock(); _bclen = mc.getColsPerBlock(); _clen = mc.getCols(); - _maxRowsPerBlock = Math.max((int) (BUFFER_SIZE/_clen), 1); + _maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1); } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java index 48a8496..288a8b5 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java @@ -38,7 +38,7 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.functionobjects.SortIndex; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedMatrixBlock; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock; import org.apache.sysml.runtime.instructions.spark.data.RowMatrixBlock; import org.apache.sysml.runtime.instructions.spark.functions.ReplicateVectorFunction; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -218,8 +218,8 @@ public class RDDSortUtils sortedIxSrc.quickSetValue((int)sortedIx.quickGetValue(i,0)-1, 0, i+1); //broadcast index vector - PartitionedMatrixBlock pmb = new PartitionedMatrixBlock(sortedIxSrc, brlen, bclen); - Broadcast<PartitionedMatrixBlock> _pmb = sec.getSparkContext().broadcast(pmb); + PartitionedBlock<MatrixBlock> pmb = new PartitionedBlock<MatrixBlock>(sortedIxSrc, brlen, bclen); + Broadcast<PartitionedBlock<MatrixBlock>> _pmb = sec.getSparkContext().broadcast(pmb); //sort data with broadcast index vector JavaPairRDD<MatrixIndexes, RowMatrixBlock> ret = data @@ -641,9 +641,9 @@ public class RDDSortUtils private long _rlen = -1; private int _brlen = -1; - private Broadcast<PartitionedMatrixBlock> _pmb = null; + private Broadcast<PartitionedBlock<MatrixBlock>> _pmb = null; - public ShuffleMatrixBlockRowsInMemFunction(long rlen, int brlen, Broadcast<PartitionedMatrixBlock> pmb) + public ShuffleMatrixBlockRowsInMemFunction(long rlen, int brlen, Broadcast<PartitionedBlock<MatrixBlock>> pmb) { _rlen = rlen; _brlen = brlen; @@ -694,7 +694,7 @@ public class RDDSortUtils //produce next output tuple MatrixIndexes ixmap = _currBlk._1(); MatrixBlock data = _currBlk._2(); - MatrixBlock mbTargetIndex = _pmb.value().getMatrixBlock((int)ixmap.getRowIndex(), 1); + MatrixBlock mbTargetIndex = _pmb.value().getBlock((int)ixmap.getRowIndex(), 1); long valix = (long) mbTargetIndex.getValue(_currPos, 0); long rix = UtilFunctions.computeBlockIndex(valix, _brlen); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index 98975da..90d0148 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -92,6 +92,29 @@ public class SparkUtils return ret; } + /** + * + * @param in + * @return + */ + public static Pair<MatrixIndexes,MatrixBlock> fromIndexedMatrixBlockToPair( IndexedMatrixValue in ){ + return new Pair<MatrixIndexes,MatrixBlock>(in.getIndexes(), (MatrixBlock)in.getValue()); + } + + /** + * + * @param in + * @return + */ + public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in ) + { + ArrayList<Pair<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Pair<MatrixIndexes,MatrixBlock>>(); + for( IndexedMatrixValue imv : in ) + ret.add(fromIndexedMatrixBlockToPair(imv)); + + return ret; + } + /** * @@ -102,7 +125,6 @@ public class SparkUtils return new Tuple2<Long, FrameBlock>(in.getKey(), in.getValue()); } - /** * * @param in @@ -132,6 +154,25 @@ public class SparkUtils /** * + * @param in + * @return + */ + public static Pair<Long,FrameBlock> toIndexedFrameBlock( Tuple2<Long,FrameBlock> in ) { + return new Pair<Long,FrameBlock>(in._1(), in._2()); + } + + /** + * + * @param ix + * @param mb + * @return + */ + public static Pair<Long,FrameBlock> toIndexedFrameBlock( Long ix, FrameBlock fb ) { + return new Pair<Long,FrameBlock>(ix, fb); + } + + /** + * * @param mb * @param blen * @return http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 0065fe8..e51bd9d 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -46,10 +46,12 @@ import org.apache.sysml.runtime.util.UtilFunctions; * */ @SuppressWarnings({"rawtypes","unchecked"}) //allow generic native arrays -public class FrameBlock implements Writable, CacheBlock, Externalizable +public class FrameBlock implements Writable, CacheBlock, Externalizable { private static final long serialVersionUID = -3993450030207130665L; + public static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, size of default matrix block + //internal configuration private static final boolean REUSE_RECODE_MAPS = true; @@ -146,6 +148,16 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable } /** + * Sets the schema of the frame block. + * + * @return + */ + public void setSchema(List<ValueType> schema) { + _schema = schema; + _colnames = createColNames(schema.size()); + } + + /** * Returns the column names of the frame block. * * @return @@ -288,13 +300,14 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable * @param val */ public void set(int r, int c, Object val) { - _coldata.get(c).set(r, val); + _coldata.get(c).set(r, UtilFunctions.objectToObject(_schema.get(c), val)); } - + public void reset(int nrow) { getSchema().clear(); getColumnNames().clear(); + if(_coldata != null) for(int i=0; i < _coldata.size(); ++i) _coldata.get(i)._size = nrow; @@ -567,6 +580,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable throw new DMLRuntimeException("Invalid values for frame indexing: ["+(rl+1)+":"+(ru+1)+"," + (cl+1)+":"+(cu+1)+"] " + "must be within frame dimensions ["+getNumRows()+","+getNumColumns()+"]."); } + if ( (ru-rl+1) < rhsFrame.getNumRows() || (cu-cl+1) < rhsFrame.getNumColumns()) { throw new DMLRuntimeException("Invalid values for frame indexing: " + "dimensions of the source frame ["+rhsFrame.getNumRows()+"x" + rhsFrame.getNumColumns() + "] " + @@ -574,12 +588,14 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable (rl+1) +":" + (ru+1) + ", " + (cl+1) + ":" + (cu+1) + "]."); } + //allocate output frame (incl deep copy schema) if( ret == null ) ret = new FrameBlock(); - ret._numRows = _numRows; + ret._numRows = _numRows; ret._schema = new ArrayList<ValueType>(_schema); ret._colnames = new ArrayList<String>(_colnames); + ret._colmeta = new ArrayList<ColumnMetadata>(_colmeta); //copy data to output and partial overwrite w/ rhs for( int j=0; j<getNumColumns(); j++ ) { @@ -618,9 +634,10 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable * @param ret * @return */ - public FrameBlock sliceOperations(int rl, int ru, int cl, int cu, FrameBlock ret) + public FrameBlock sliceOperations(int rl, int ru, int cl, int cu, CacheBlock retCache) throws DMLRuntimeException { + FrameBlock ret = (FrameBlock)retCache; // check the validity of bounds if ( rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows() || cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns() ) { @@ -638,6 +655,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable for( int j=cl; j<=cu; j++ ) { ret._schema.add(_schema.get(j)); ret._colnames.add(_colnames.get(j)); + ret._colmeta.add(_colmeta.get(j)); } ret._numRows = ru-rl+1; @@ -652,6 +670,41 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable return ret; } + + public void sliceOperations(ArrayList<Pair<Long,FrameBlock>> outlist, IndexRange range, int rowCut) + { + FrameBlock top=null, bottom=null; + Iterator<Pair<Long,FrameBlock>> p=outlist.iterator(); + + if(range.rowStart<rowCut) + top=(FrameBlock) p.next().getValue(); + + if(range.rowEnd>=rowCut) + bottom=(FrameBlock) p.next().getValue(); + + if(getNumRows() > 0) + { + int r=(int) range.rowStart; + + for(; r<Math.min(rowCut, range.rowEnd+1); r++) + { + Object[] row = new Object[(int) (range.colEnd-range.colStart+1)]; + for(int c=(int) range.colStart; c<range.colEnd+1; c++) + row[(int) (c-range.colStart)] = get(r,c); + top.appendRow(row); + } + + for(; r<=range.rowEnd; r++) + { + Object[] row = new Object[(int) (range.colEnd-range.colStart+1)]; + for(int c=(int) range.colStart; c<range.colEnd+1; c++) + row[(int) (c-range.colStart)] = get(r,c); + bottom.appendRow(row); + } + } + + } + /** * Appends the given argument frameblock 'that' to this frameblock by * creating a deep copy to prevent side effects. For cbind, the frames @@ -789,12 +842,18 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable return map; } + public void merge(CacheBlock that, boolean bDummy) + throws DMLRuntimeException + { + merge((FrameBlock)that); + } + /** * * @param that * @throws DMLRuntimeException */ - public void merge(FrameBlock that ) + public void merge(FrameBlock that) throws DMLRuntimeException { //check for empty input source (nothing to merge) @@ -802,35 +861,77 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable return; //check dimensions (before potentially copy to prevent implicit dimension change) - if( getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns() ) + if ( getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns() ) throw new DMLRuntimeException("Dimension mismatch on merge disjoint (target="+getNumRows()+"x"+getNumColumns()+", source="+that.getNumRows()+"x"+that.getNumColumns()+")"); //core frame block merge through cell copy - for( int i=0; i<getNumRows(); i++ ) { + for( int i=0; i<that.getNumRows(); i++ ) { for( int j=0; j<getNumColumns(); j++ ) { - switch( _schema.get(j) ) { - case STRING: - if (that.get(i,j) != null) - set(i,j,that.get(i, j)); - break; - case BOOLEAN: - if ((Boolean)that.get(i,j) != Boolean.getBoolean("false")) - set(i,j,that.get(i, j)); - break; - case INT: - if ((Long)that.get(i,j) != 0) - set(i,j,that.get(i, j)); - break; - case DOUBLE: - if ((Double)that.get(i,j) != 0.0) - set(i,j,that.get(i, j)); - break; - default: throw new RuntimeException("Unsupported value type: "+_schema.get(j)); - } + Object obj = UtilFunctions.objectToObject(getSchema().get(j), that.get(i,j), true); + if (obj != null) // Do not update with "null" data + set(i, j,obj); } } } + + /** + * This function ZERO OUT the data in the slicing window applicable for this block. + * + * + * @param result + * @param range + * @param complementary + * @param iRowStartSrc + * @param iRowStartDest + * @param brlen + * @param iMaxRowsToCopy + * + */ + public FrameBlock zeroOutOperations(FrameBlock result, IndexRange range, boolean complementary, int iRowStartSrc, int iRowStartDest, int brlen, int iMaxRowsToCopy) + throws DMLRuntimeException + { + int clen = getNumColumns(); + + if(result==null) + result=new FrameBlock(getSchema()); + else + { + result.reset(0); + result.setSchema(getSchema()); + } + result.ensureAllocatedColumns(brlen); + + if(complementary) + { + for(int r=(int) range.rowStart; r<=range.rowEnd&&r+iRowStartDest<brlen; r++) + { + for(int c=(int) range.colStart; c<=range.colEnd; c++) + result.set(r+iRowStartDest, c, get(r+iRowStartSrc,c)); + } + }else + { + int r=iRowStartDest; + for(; r<(int)range.rowStart && r-iRowStartDest<iMaxRowsToCopy ; r++) + for(int c=0; c<clen; c++/*, offset++*/) + result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c)); + + for(; r<=(int)range.rowEnd && r-iRowStartDest<iMaxRowsToCopy ; r++) + { + for(int c=0; c<(int)range.colStart; c++) + result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c)); + + for(int c=(int)range.colEnd+1; c<clen; c++) + result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c)); + } + + for(; r-iRowStartDest<iMaxRowsToCopy ; r++) + for(int c=0; c<clen; c++) + result.set(r, c, get(r+iRowStartSrc-iRowStartDest,c)); + } + + return result; + } /////// @@ -887,6 +988,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable } } + /** * */ @@ -949,7 +1051,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable _data[index] = value; } public void set(int rl, int ru, Array value) { - System.arraycopy(((StringArray)value)._data, 0, _data, rl, ru-rl+1); + set(rl, ru, value, 0); } public void set(int rl, int ru, Array value, int rlSrc) { System.arraycopy(((StringArray)value)._data, rlSrc, _data, rl, ru-rl+1); @@ -995,7 +1097,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable _data[index] = (value!=null) ? value : false; } public void set(int rl, int ru, Array value) { - System.arraycopy(((BooleanArray)value)._data, 0, _data, rl, ru-rl+1); + set(rl, ru, value, 0); } public void set(int rl, int ru, Array value, int rlSrc) { System.arraycopy(((BooleanArray)value)._data, rlSrc, _data, rl, ru-rl+1); @@ -1042,7 +1144,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable _data[index] = (value!=null) ? value : 0L; } public void set(int rl, int ru, Array value) { - System.arraycopy(((LongArray)value)._data, 0, _data, rl, ru-rl+1); + set(rl, ru, value, 0); } public void set(int rl, int ru, Array value, int rlSrc) { System.arraycopy(((LongArray)value)._data, rlSrc, _data, rl, ru-rl+1); @@ -1089,7 +1191,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable _data[index] = (value!=null) ? value : 0d; } public void set(int rl, int ru, Array value) { - System.arraycopy(((DoubleArray)value)._data, 0, _data, rl, ru-rl+1); + set(rl,ru, value, 0); } public void set(int rl, int ru, Array value, int rlSrc) { System.arraycopy(((DoubleArray)value)._data, rlSrc, _data, rl, ru-rl+1); @@ -1147,4 +1249,15 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable _mvValue = mvVal; } } + + @Override + public ArrayList getPairList() { + return new ArrayList<Pair<Long, FrameBlock>>(); + } + + public ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) throws DMLRuntimeException + { + return OperationsOnMatrixValues.performSlice(ixrange, brlen, bclen, iix, jix, (FrameBlock)in); + } + } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 54cba0f..9b40cbd 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -86,6 +86,7 @@ import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.UtilFunctions; + public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizable { private static final long serialVersionUID = 7319972089143154056L; @@ -1734,6 +1735,12 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab Arrays.fill(denseBlock, ix2, ix2+rowLen, 0); } + public void merge(CacheBlock that, boolean appendOnly) + throws DMLRuntimeException + { + merge((MatrixBlock)that, appendOnly); + } + /** * Merge disjoint: merges all non-zero values of the given input into the current @@ -4034,7 +4041,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * * @throws DMLRuntimeException */ - public MatrixBlock sliceOperations(int rl, int ru, int cl, int cu, MatrixBlock ret) + public MatrixBlock sliceOperations(int rl, int ru, int cl, int cu, CacheBlock ret) throws DMLRuntimeException { // check the validity of bounds @@ -4046,7 +4053,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab // Output matrix will have the same sparsity as that of the input matrix. // (assuming a uniform distribution of non-zeros in the input) - MatrixBlock result=checkType(ret); + MatrixBlock result=checkType((MatrixBlock)ret); long estnnz= (long) ((double)this.nonZeros/rlen/clen*(ru-rl+1)*(cu-cl+1)); boolean result_sparsity = this.sparse && MatrixBlock.evalSparseFormatInMemory(ru-rl+1, cu-cl+1, estnnz); if(result==null) @@ -6213,4 +6220,15 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } public SparsityEstimate(){} } + + public ArrayList<Pair<MatrixIndexes, MatrixBlock>> getPairList() + { + return new ArrayList<Pair<MatrixIndexes,MatrixBlock>>(); + } + + @SuppressWarnings("unchecked") + public ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) throws DMLRuntimeException + { + return OperationsOnMatrixValues.performSlice(ixrange, brlen, bclen, iix, jix, (MatrixBlock)in); + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java index 03a36fe..be12228 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java @@ -21,10 +21,14 @@ package org.apache.sysml.runtime.matrix.data; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.functionobjects.Builtin; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; @@ -292,6 +296,17 @@ public class OperationsOnMatrixValues value1.aggregateBinaryOperations(value1, value2, valueOut, op); } + @SuppressWarnings("rawtypes") + public static ArrayList performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, MatrixBlock in) + throws DMLRuntimeException + { + IndexedMatrixValue imv = new IndexedMatrixValue(new MatrixIndexes(iix, jix), (MatrixBlock)in); + ArrayList<IndexedMatrixValue> outlist = new ArrayList<IndexedMatrixValue>(); + performSlice(imv, ixrange, brlen, bclen, outlist); + + return SparkUtils.fromIndexedMatrixBlockToPair(outlist); + } + /** * * @param val @@ -463,4 +478,144 @@ public class OperationsOnMatrixValues } } } + + @SuppressWarnings("rawtypes") + public static ArrayList performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, FrameBlock in) + throws DMLRuntimeException + { + Pair<Long, FrameBlock> lfp = new Pair<Long, FrameBlock>(new Long(((iix-1)*brlen)+1), in); + ArrayList<Pair<Long, FrameBlock>> outlist = new ArrayList<Pair<Long, FrameBlock>>(); + performSlice(lfp, ixrange, brlen, bclen, outlist); + + return outlist; + } + + + /** + * This function will get slice of the input frame block overlapping in overall slice(Range), slice has requested for. + * + * @param val + * @param range + * @param brlen + * @param bclen + * @param outlist + * @throws DMLRuntimeException + */ + public static void performSlice(Pair<Long,FrameBlock> in, IndexRange ixrange, int brlen, int bclen, ArrayList<Pair<Long,FrameBlock>> outlist) + throws DMLRuntimeException + { + long index = in.getKey(); + FrameBlock block = in.getValue(); + + // Get Block indexes (rows and columns boundaries) + long cellIndexTopRow = index; + long cellIndexBottomRow = index+block.getNumRows()-1; + long cellIndexLeftCol = 1; + long cellIndexRightCol = block.getNumColumns(); + + // Calculate block boundaries with range of slice to be performed (Global index) + long cellIndexOverlapTop = Math.max(cellIndexTopRow, ixrange.rowStart); + long cellIndexOverlapBottom = Math.min(cellIndexBottomRow, ixrange.rowEnd); + long cellIndexOverlapLeft = Math.max(cellIndexLeftCol, ixrange.colStart); + long cellIndexOverlapRight = Math.min(cellIndexRightCol, ixrange.colEnd); + + //check if block is outside the indexing range + if(cellIndexOverlapTop>cellIndexOverlapBottom || cellIndexOverlapLeft>cellIndexOverlapRight) { + return; + } + + // Create IndexRange for the slice to be performed on this block. + IndexRange tmpRange = new IndexRange( + cellIndexOverlapTop - index, + cellIndexOverlapBottom - index, + cellIndexOverlapLeft -1, + cellIndexOverlapRight - 1); + + // Get Top Row and Left column cutting point. + int rowCut=(int)(ixrange.rowStart-index); + + // Get indices for result block + long resultBlockIndexTop=UtilFunctions.computeBlockIndex(cellIndexOverlapTop, brlen); + long resultBlockIndexBottom=UtilFunctions.computeBlockIndex(cellIndexOverlapBottom, brlen); + + //allocate space for the output value + for(long r=resultBlockIndexTop; r<=resultBlockIndexBottom; r++) + { + List<ValueType> schema = UtilFunctions.getSubSchema(block.getSchema(), tmpRange.colStart, tmpRange.colEnd); + long iResultIndex = (r-1)*brlen+tmpRange.rowStart; + Pair<Long,FrameBlock> out=new Pair<Long,FrameBlock>(new Long(iResultIndex+1), new FrameBlock(schema)); + outlist.add(out); + } + + //execute actual slice operation + block.sliceOperations(outlist, tmpRange, rowCut); + } + + /** + * + * @param in + * @param ixrange + * @param brlen + * @param bclen + * @param rlen + * @param clen + * @param outlist + * @throws DMLRuntimeException + */ + public static void performShift(Pair<Long,FrameBlock> in, IndexRange ixrange, int brlenLeft, int clenLeft/*, int bclen*/, long rlen, long clen, ArrayList<Pair<Long,FrameBlock>> outlist) + throws DMLRuntimeException + { + Long ix = in.getKey(); + FrameBlock fb = in.getValue(); + long start_lhs_globalRowIndex = ixrange.rowStart + (ix-1); + long start_lhs_globalColIndex = ixrange.colStart; + long end_lhs_globalRowIndex = start_lhs_globalRowIndex + fb.getNumRows() - 1; + long end_lhs_globalColIndex = ixrange.colEnd; + + long start_lhs_rowIndex = UtilFunctions.computeBlockIndex(start_lhs_globalRowIndex, brlenLeft); + long end_lhs_rowIndex = UtilFunctions.computeBlockIndex(end_lhs_globalRowIndex, brlenLeft); + + for(long leftRowIndex = start_lhs_rowIndex; leftRowIndex <= end_lhs_rowIndex; leftRowIndex++) { + + // Calculate global index of right hand side block + long lhs_rl = Math.max((leftRowIndex-1)*brlenLeft+1, start_lhs_globalRowIndex); + long lhs_ru = Math.min(leftRowIndex*brlenLeft, end_lhs_globalRowIndex); + long lhs_cl = start_lhs_globalColIndex; + long lhs_cu = end_lhs_globalColIndex; + + int lhs_lrl = UtilFunctions.computeCellInBlock(lhs_rl, brlenLeft); + int lhs_lru = UtilFunctions.computeCellInBlock(lhs_ru, brlenLeft); + int lhs_lcl = (int)lhs_cl-1; + int lhs_lcu = (int)lhs_cu-1; + + long rhs_rl = lhs_rl - (ixrange.rowStart-1) - (ix-1); + long rhs_ru = rhs_rl + (lhs_ru - lhs_rl); + long rhs_cl = lhs_cl - ixrange.colStart + 1; + long rhs_cu = rhs_cl + (lhs_cu - lhs_cl); + + // local indices are 0 (zero) based. + int rhs_lrl = (int) (UtilFunctions.computeCellInBlock(rhs_rl, fb.getNumRows())); + int rhs_lru = (int) (UtilFunctions.computeCellInBlock(rhs_ru, fb.getNumRows())); + int rhs_lcl = (int)rhs_cl-1; + int rhs_lcu = (int)rhs_cu-1; + + FrameBlock slicedRHSBlk = fb.sliceOperations(rhs_lrl, rhs_lru, rhs_lcl, rhs_lcu, new FrameBlock()); + + int lbclen = clenLeft; + + List<ValueType> schemaPartialLeft = Collections.nCopies(lhs_lcl, ValueType.STRING); + List<ValueType> schemaRHS = UtilFunctions.getSubSchema(fb.getSchema(), rhs_lcl, rhs_lcl-lhs_lcl+lhs_lcu); + List<ValueType> schema = new ArrayList<ValueType>(schemaPartialLeft); + schema.addAll(schemaRHS); + List<ValueType> schemaPartialRight = Collections.nCopies(lbclen-schema.size(), ValueType.STRING); + schema.addAll(schemaPartialRight); + FrameBlock resultBlock = new FrameBlock(schema); + int iRHSRows = (int)(leftRowIndex<=rlen/brlenLeft?brlenLeft:rlen-(rlen/brlenLeft)*brlenLeft); + resultBlock.ensureAllocatedColumns(iRHSRows); + + resultBlock = resultBlock.leftIndexingOperations(slicedRHSBlk, lhs_lrl, lhs_lru, lhs_lcl, lhs_lcu, new FrameBlock()); + outlist.add(new Pair<Long, FrameBlock>((leftRowIndex-1)*brlenLeft+1, resultBlock)); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java index 423607c..4d4a975 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java @@ -92,7 +92,7 @@ public class OutputInfo implements Serializable else throw new DMLRuntimeException("Unrecognized output info: " + oi); } - + public static OutputInfo stringToOutputInfo (String str) { if ( str.equalsIgnoreCase("textcell")) { return TextCellOutputInfo; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/266d4c8c/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java index e7aebb4..b059a06 100644 --- a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java +++ b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java @@ -201,7 +201,8 @@ public class FastBufferedDataOutputStream extends FilterOutputStream implements @Override public void writeUTF(String s) throws IOException { - throw new IOException("Not supported."); + byte[] strBytes = s.getBytes("UTF-8"); + write(strBytes, 0, strBytes.length); }