[MINOR] Throw an exception if incorrect long-to-int conversion occurs - Without this fix, an incorrect conversion is silently ignored. This is propagated further down in the engine and thrown as a different error such as "Invalid block dimensions" or an incorrect result is returned.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/95cbbd65 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/95cbbd65 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/95cbbd65 Branch: refs/heads/master Commit: 95cbbd656b9c2c85b79536dd5175ce49ff0c1d22 Parents: bc6e941 Author: Niketan Pansare <npan...@us.ibm.com> Authored: Fri Nov 30 13:58:46 2018 -0800 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Fri Nov 30 13:59:26 2018 -0800 ---------------------------------------------------------------------- .../api/mlcontext/MLContextConversionUtil.java | 5 +- src/main/java/org/apache/sysml/hops/DnnOp.java | 17 ++-- .../java/org/apache/sysml/hops/IndexingOp.java | 5 +- .../org/apache/sysml/hops/OptimizerUtils.java | 13 +-- .../sysml/hops/codegen/cplan/CNodeNary.java | 5 +- .../hops/recompile/LiteralReplacement.java | 9 +- .../java/org/apache/sysml/lops/compile/Dag.java | 15 ++-- .../apache/sysml/lops/runtime/RunMRJobs.java | 3 +- .../controlprogram/ParForProgramBlock.java | 19 ++-- .../controlprogram/caching/ByteBuffer.java | 5 +- .../controlprogram/caching/CacheStatistics.java | 14 +-- .../controlprogram/caching/FrameObject.java | 15 ++-- .../controlprogram/caching/MatrixObject.java | 19 ++-- .../context/ExecutionContext.java | 3 +- .../context/SparkExecutionContext.java | 31 +++---- .../controlprogram/paramserv/LocalPSWorker.java | 3 +- .../paramserv/ParamservUtils.java | 5 +- .../paramserv/dp/DRLocalScheme.java | 3 +- .../paramserv/dp/DRSparkScheme.java | 5 +- .../paramserv/dp/DataPartitionSparkScheme.java | 3 +- .../dp/DataPartitionerSparkAggregator.java | 9 +- .../paramserv/dp/SparkDataPartitioner.java | 7 +- .../paramserv/rpc/PSRpcObject.java | 3 +- .../controlprogram/parfor/DataPartitioner.java | 7 +- .../parfor/DataPartitionerLocal.java | 25 +++--- .../parfor/DataPartitionerRemoteMR.java | 3 +- .../parfor/DataPartitionerRemoteMapper.java | 13 +-- .../parfor/DataPartitionerRemoteSpark.java | 3 +- .../DataPartitionerRemoteSparkMapper.java | 5 +- .../controlprogram/parfor/RemoteDPParForMR.java | 23 ++--- .../parfor/RemoteDPParForSpark.java | 5 +- .../parfor/RemoteDPParForSparkWorker.java | 19 ++-- .../parfor/RemoteDPParWorkerReducer.java | 21 ++--- .../RemoteParForColocatedNLineInputFormat.java | 3 +- .../controlprogram/parfor/RemoteParForMR.java | 19 ++-- .../parfor/RemoteParForSparkWorker.java | 5 +- .../parfor/RemoteParWorkerMapper.java | 3 +- .../parfor/ResultMergeLocalFile.java | 5 +- .../parfor/ResultMergeLocalMemory.java | 5 +- .../parfor/ResultMergeRemoteMR.java | 3 +- .../parfor/ResultMergeRemotePartitioning.java | 5 +- .../parfor/ResultMergeRemoteSpark.java | 5 +- .../parfor/TaskPartitionerStatic.java | 3 +- .../cp/CentralMomentCPInstruction.java | 3 +- .../instructions/cp/CtableCPInstruction.java | 5 +- .../instructions/cp/DataGenCPInstruction.java | 5 +- .../cp/DataPartitionCPInstruction.java | 5 +- .../instructions/cp/DnnCPInstruction.java | 11 +-- .../cp/FrameIndexingCPInstruction.java | 3 +- .../instructions/cp/IndexingCPInstruction.java | 9 +- .../cp/ListIndexingCPInstruction.java | 9 +- .../runtime/instructions/cp/ListObject.java | 5 +- .../cp/MatrixIndexingCPInstruction.java | 7 +- .../cp/MatrixReshapeCPInstruction.java | 5 +- .../instructions/cp/PMMJCPInstruction.java | 3 +- .../cp/ParameterizedBuiltinCPInstruction.java | 3 +- .../cp/ParamservBuiltinCPInstruction.java | 3 +- .../instructions/cp/ReorgCPInstruction.java | 3 +- .../cp/StringInitCPInstruction.java | 7 +- .../gpu/AggregateBinaryGPUInstruction.java | 5 +- .../gpu/AggregateUnaryGPUInstruction.java | 5 +- .../instructions/gpu/DnnGPUInstruction.java | 58 ++++++------ .../instructions/gpu/MMTSJGPUInstruction.java | 3 +- .../gpu/MatrixIndexingGPUInstruction.java | 9 +- .../MatrixMatrixArithmeticGPUInstruction.java | 3 +- .../gpu/MatrixMatrixAxpyGPUInstruction.java | 3 +- ...rixMatrixRelationalBinaryGPUInstruction.java | 3 +- .../gpu/MatrixReshapeGPUInstruction.java | 5 +- .../instructions/gpu/ReorgGPUInstruction.java | 5 +- .../ScalarMatrixArithmeticGPUInstruction.java | 5 +- ...larMatrixRelationalBinaryGPUInstruction.java | 5 +- .../mr/AggregateBinaryInstruction.java | 5 +- .../spark/AppendGSPInstruction.java | 5 +- .../instructions/spark/CastSPInstruction.java | 3 +- .../spark/CentralMomentSPInstruction.java | 3 +- .../instructions/spark/CpmmSPInstruction.java | 7 +- .../spark/CumulativeAggregateSPInstruction.java | 7 +- .../spark/CumulativeOffsetSPInstruction.java | 3 +- .../instructions/spark/DnnSPInstruction.java | 13 +-- .../spark/FrameIndexingSPInstruction.java | 7 +- .../spark/MapmmChainSPInstruction.java | 3 +- .../instructions/spark/MapmmSPInstruction.java | 19 ++-- .../spark/MatrixAppendMSPInstruction.java | 13 +-- .../spark/MatrixIndexingSPInstruction.java | 19 ++-- ...ReturnParameterizedBuiltinSPInstruction.java | 5 +- .../instructions/spark/PMapmmSPInstruction.java | 7 +- .../ParameterizedBuiltinSPInstruction.java | 31 +++---- .../instructions/spark/PmmSPInstruction.java | 3 +- .../spark/QuantilePickSPInstruction.java | 5 +- .../spark/QuaternarySPInstruction.java | 13 +-- .../instructions/spark/RandSPInstruction.java | 15 ++-- .../instructions/spark/ReorgSPInstruction.java | 5 +- .../instructions/spark/RmmSPInstruction.java | 3 +- .../instructions/spark/SpoofSPInstruction.java | 17 ++-- .../instructions/spark/Tsmm2SPInstruction.java | 27 +++--- .../spark/data/BlockPartitioner.java | 7 +- .../spark/data/PartitionedBlock.java | 5 +- .../spark/data/PartitionedBroadcast.java | 5 +- .../functions/ExtractBlockForBinaryReblock.java | 5 +- .../spark/functions/ExtractGroup.java | 5 +- .../MatrixVectorBinaryOpPartitionFunction.java | 5 +- .../spark/utils/FrameRDDConverterUtils.java | 45 +++++----- .../spark/utils/RDDConverterUtils.java | 51 +++++------ .../spark/utils/RDDConverterUtilsExt.java | 15 ++-- .../instructions/spark/utils/RDDSortUtils.java | 27 +++--- .../instructions/spark/utils/SparkUtils.java | 7 +- .../runtime/matrix/data/LibMatrixCUDA.java | 6 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 93 ++++++++++---------- .../java/org/apache/sysml/utils/IntUtils.java | 34 +++++++ 109 files changed, 635 insertions(+), 504 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java index 5d2478f..1a06fa0 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java @@ -65,6 +65,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; import scala.collection.JavaConversions; import scala.reflect.ClassTag; @@ -299,7 +300,7 @@ public class MLContextConversionUtil { frameMetadata.asMatrixCharacteristics() : new MatrixCharacteristics(); ValueType[] schema = (frameMetadata != null) ? frameMetadata.getFrameSchema().getSchema().toArray(new ValueType[0]) : - UtilFunctions.nCopies((int)mc.getCols(), ValueType.STRING); + UtilFunctions.nCopies(IntUtils.toInt(mc.getCols()), ValueType.STRING); FrameObject frameObject = new FrameObject(OptimizerUtils.getUniqueTempFileName(), new MetaDataFormat(mc, OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo), schema); @@ -689,7 +690,7 @@ public class MLContextConversionUtil { try { ValueType[] lschema = null; if (lschema == null) - lschema = UtilFunctions.nCopies((int) mc.getCols(), ValueType.STRING); + lschema = UtilFunctions.nCopies(IntUtils.toInt(mc.getCols()), ValueType.STRING); rdd = FrameRDDConverterUtils.textCellToBinaryBlock(jsc(), javaPairRDDText, mc, lschema); } catch (DMLRuntimeException e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/hops/DnnOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/DnnOp.java b/src/main/java/org/apache/sysml/hops/DnnOp.java index a948eed..275d0f0 100644 --- a/src/main/java/org/apache/sysml/hops/DnnOp.java +++ b/src/main/java/org/apache/sysml/hops/DnnOp.java @@ -32,6 +32,7 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.gpu.context.GPUContextPool; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.DnnParameters; +import org.apache.sysml.utils.IntUtils; import java.util.ArrayList; @@ -379,18 +380,18 @@ public class DnnOp extends MultiThreadedHop private static class IntermediateDimensions { int dim1; int dim2; double sp; public IntermediateDimensions(DnnOp h, String dim1Str, String dim2Str, double sp) { - dim1 = (int) h.getDim(dim1Str); - dim2 = (int) h.getDim(dim2Str); + dim1 = IntUtils.toInt(h.getDim(dim1Str)); + dim2 = IntUtils.toInt(h.getDim(dim2Str)); this.sp = sp; } public IntermediateDimensions(DnnOp h, String dim1Str, String dim2Str) { - dim1 = (int) h.getDim(dim1Str); - dim2 = (int) h.getDim(dim2Str); + dim1 = IntUtils.toInt(h.getDim(dim1Str)); + dim2 = IntUtils.toInt(h.getDim(dim2Str)); sp = 1; } public IntermediateDimensions(DnnOp h, int dim1, String dim2Str) { this.dim1 = dim1; - dim2 = (int) h.getDim(dim2Str); + dim2 = IntUtils.toInt(h.getDim(dim2Str)); sp = 1; } @@ -449,7 +450,7 @@ public class DnnOp extends MultiThreadedHop ArrayList<IntermediateDimensions> gpuIntermediates, ArrayList<IntermediateDimensions> cpIntermediates) { // Since CP operators use row-level parallelism by default - int numWorkers = (int) Math.min(OptimizerUtils.getConstrainedNumThreads(_maxNumThreads), Math.max(getDim("N"), 1)); + int numWorkers = IntUtils.toInt(Math.min(OptimizerUtils.getConstrainedNumThreads(_maxNumThreads), Math.max(getDim("N"), 1))); if(ConfigurationManager.isGPU()) { // Account for potential sparse-to-dense conversion double gpuMemBudget = IntermediateDimensions.addEstimateSizes(gpuIntermediates, 1); @@ -677,10 +678,10 @@ public class DnnOp extends MultiThreadedHop // P = as.integer(floor((H + 2*pad_h - R)/stride_h + 1)) // Q = as.integer(floor((W + 2*pad_w - S)/stride_w + 1)) if(_cachedParams.P < 0 && _cachedParams.H >= 0 && _cachedParams.R >= 0 && _cachedParams.stride_h >= 0 && _cachedParams.pad_h >= 0) { - _cachedParams.P = (int) org.apache.sysml.runtime.util.DnnUtils.getP(_cachedParams.H, _cachedParams.R, _cachedParams.stride_h, _cachedParams.pad_h); + _cachedParams.P = IntUtils.toInt(org.apache.sysml.runtime.util.DnnUtils.getP(_cachedParams.H, _cachedParams.R, _cachedParams.stride_h, _cachedParams.pad_h)); } if(_cachedParams.Q < 0 && _cachedParams.W >= 0 && _cachedParams.S >= 0 && _cachedParams.stride_w >= 0 && _cachedParams.pad_w >= 0) { - _cachedParams.Q = (int) org.apache.sysml.runtime.util.DnnUtils.getQ(_cachedParams.W, _cachedParams.S, _cachedParams.stride_w, _cachedParams.pad_w); + _cachedParams.Q = IntUtils.toInt(org.apache.sysml.runtime.util.DnnUtils.getQ(_cachedParams.W, _cachedParams.S, _cachedParams.stride_w, _cachedParams.pad_w)); } return _cachedParams; http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/hops/IndexingOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/IndexingOp.java b/src/main/java/org/apache/sysml/hops/IndexingOp.java index 1091027..8d12da7 100644 --- a/src/main/java/org/apache/sysml/hops/IndexingOp.java +++ b/src/main/java/org/apache/sysml/hops/IndexingOp.java @@ -31,6 +31,7 @@ import org.apache.sysml.lops.LopProperties.ExecType; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.utils.IntUtils; //for now only works for range based indexing op public class IndexingOp extends Hop @@ -347,8 +348,8 @@ public class IndexingOp extends Hop long ru = (input3 instanceof LiteralOp) ? (HopRewriteUtils.getIntValueSafe((LiteralOp)input3)) : -1; long cl = (input4 instanceof LiteralOp) ? (HopRewriteUtils.getIntValueSafe((LiteralOp)input4)) : -1; long cu = (input5 instanceof LiteralOp) ? (HopRewriteUtils.getIntValueSafe((LiteralOp)input5)) : -1; - int brlen = (int)input1.getRowsInBlock(); - int bclen = (int)input1.getColsInBlock(); + int brlen = input1.getRowsInBlock(); + int bclen = input1.getColsInBlock(); return OptimizerUtils.isIndexingRangeBlockAligned(rl, ru, cl, cu, brlen, bclen); } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index a43abb3..432319f 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -57,6 +57,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.yarn.ropt.YarnClusterAnalyzer; public class OptimizerUtils @@ -519,7 +520,7 @@ public class OptimizerUtils //correction max number of reducers on yarn clusters if( InfrastructureAnalyzer.isYarnEnabled() ) - ret = (int)Math.max( ret, YarnClusterAnalyzer.getNumCores()/2 ); + ret = IntUtils.toInt(Math.max( ret, YarnClusterAnalyzer.getNumCores()/2 )); } return ret; @@ -534,7 +535,7 @@ public class OptimizerUtils //correction max number of reducers on yarn clusters if( InfrastructureAnalyzer.isYarnEnabled() ) - ret = (int)Math.max( ret, YarnClusterAnalyzer.getNumCores() ); + ret = IntUtils.toInt(Math.max( ret, YarnClusterAnalyzer.getNumCores() )); return ret; } @@ -582,7 +583,7 @@ public class OptimizerUtils //compute degree of parallelism for parallel text read double dop = InfrastructureAnalyzer.getLocalParallelism() * PARALLEL_CP_READ_PARALLELISM_MULTIPLIER; - return (int) Math.round(dop); + return IntUtils.toInt( Math.round(dop) ); } public static int getParallelBinaryReadParallelism() @@ -593,7 +594,7 @@ public class OptimizerUtils //compute degree of parallelism for parallel text read double dop = InfrastructureAnalyzer.getLocalParallelism() * PARALLEL_CP_READ_PARALLELISM_MULTIPLIER; - return (int) Math.round(dop); + return IntUtils.toInt( Math.round(dop) ); } /** @@ -612,7 +613,7 @@ public class OptimizerUtils //compute degree of parallelism for parallel text read double dop = InfrastructureAnalyzer.getLocalParallelism() * PARALLEL_CP_WRITE_PARALLELISM_MULTIPLIER; - return (int) Math.round(dop); + return IntUtils.toInt( Math.round(dop) ); } public static int getParallelBinaryWriteParallelism() @@ -623,7 +624,7 @@ public class OptimizerUtils //compute degree of parallelism for parallel text read double dop = InfrastructureAnalyzer.getLocalParallelism() * PARALLEL_CP_WRITE_PARALLELISM_MULTIPLIER; - return (int) Math.round(dop); + return IntUtils.toInt( Math.round(dop) ); } //////////////////////// http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeNary.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeNary.java b/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeNary.java index 1a717d3..b7d0d26 100644 --- a/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeNary.java +++ b/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeNary.java @@ -27,6 +27,7 @@ import org.apache.sysml.hops.codegen.template.TemplateUtils; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.runtime.util.DnnUtils; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public class CNodeNary extends CNode { @@ -230,8 +231,8 @@ public class CNodeNary extends CNode int K = Integer.parseInt(inputs.get(off+9).getVarname()); int R = Integer.parseInt(inputs.get(off+11).getVarname()); int S = Integer.parseInt(inputs.get(off+12).getVarname()); - int P = (int) DnnUtils.getP(H, R, 1, 0); - int Q = (int) DnnUtils.getQ(W, S, 1, 0); + int P = IntUtils.toInt(DnnUtils.getP(H, R, 1, 0)); + int Q = IntUtils.toInt(DnnUtils.getQ(W, S, 1, 0)); //construct parameter string return "rix, " + StringUtils.join( http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/hops/recompile/LiteralReplacement.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/recompile/LiteralReplacement.java b/src/main/java/org/apache/sysml/hops/recompile/LiteralReplacement.java index c81848a..c0c8acd 100644 --- a/src/main/java/org/apache/sysml/hops/recompile/LiteralReplacement.java +++ b/src/main/java/org/apache/sysml/hops/recompile/LiteralReplacement.java @@ -44,6 +44,7 @@ import org.apache.sysml.runtime.instructions.cp.ListObject; import org.apache.sysml.runtime.instructions.cp.ScalarObject; import org.apache.sysml.runtime.instructions.cp.ScalarObjectFactory; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; public class LiteralReplacement @@ -247,7 +248,7 @@ public class LiteralReplacement if( mo.getNumRows()*mo.getNumColumns() < REPLACE_LITERALS_MAX_MATRIX_SIZE ) { MatrixBlock mBlock = mo.acquireRead(); - double value = mBlock.getValue((int)rlval-1,(int)clval-1); + double value = mBlock.getValue(IntUtils.toInt(rlval-1),IntUtils.toInt(clval-1)); mo.release(); //literal substitution (always double) @@ -321,7 +322,7 @@ public class LiteralReplacement if( mo.getNumRows()*mo.getNumColumns() < REPLACE_LITERALS_MAX_MATRIX_SIZE ) { MatrixBlock mBlock = mo.acquireRead(); - MatrixBlock mBlock2 = mBlock.slice((int)(rlval-1), (int)(ruval-1), (int)(clval-1), (int)(cuval-1), new MatrixBlock()); + MatrixBlock mBlock2 = mBlock.slice(IntUtils.toInt(rlval-1), IntUtils.toInt(ruval-1), IntUtils.toInt(clval-1), IntUtils.toInt(cuval-1), new MatrixBlock()); double value = replaceUnaryAggregate((AggUnaryOp)c, mBlock2); mo.release(); @@ -369,7 +370,7 @@ public class LiteralReplacement String varname = Dag.getNextUniqueVarname(DataType.MATRIX); LiteralOp lit = (LiteralOp) ix.getInput().get(1); MatrixObject mo = (MatrixObject) (!lit.getValueType().isNumeric() ? - list.slice(lit.getName()) : list.slice((int)lit.getLongValue()-1)); + list.slice(lit.getName()) : list.slice(IntUtils.toInt(lit.getLongValue()-1))); vars.put(varname, mo); ret = HopRewriteUtils.createTransientRead(varname, c); } @@ -391,7 +392,7 @@ public class LiteralReplacement ListObject list = (ListObject)vars.get(ixIn.getName()); LiteralOp lit = (LiteralOp) ix.getInput().get(1); ScalarObject so = (ScalarObject) (!lit.getValueType().isNumeric() ? - list.slice(lit.getName()) : list.slice((int)lit.getLongValue()-1)); + list.slice(lit.getName()) : list.slice(IntUtils.toInt(lit.getLongValue()-1))); return ScalarObjectFactory.createLiteralOp(so); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/lops/compile/Dag.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java index 47b497c..ce7288e 100644 --- a/src/main/java/org/apache/sysml/lops/compile/Dag.java +++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java @@ -83,6 +83,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.sort.PickFromCompactInputFormat; +import org.apache.sysml.utils.IntUtils; /** @@ -2338,8 +2339,8 @@ public class Dag<N extends Lop> // generate an instruction that creates a symbol table entry for the new variable //String createInst = prepareVariableInstruction("createvar", node); //out.addPreInstruction(CPInstructionParser.parseSingleInstruction(createInst)); - int rpb = (int) oparams.getRowsInBlock(); - int cpb = (int) oparams.getColsInBlock(); + int rpb = IntUtils.toInt(oparams.getRowsInBlock()); + int cpb = IntUtils.toInt(oparams.getColsInBlock()); Instruction createvarInst = VariableCPInstruction.prepareCreateVariableInstruction( oparams.getLabel(), oparams.getFile_name(), @@ -2377,7 +2378,7 @@ public class Dag<N extends Lop> getFilePath() + fnOutParams.getLabel(), true, fnOut.getDataType(), OutputInfo.outputInfoToString(getOutputInfo(fnOut, false)), - new MatrixCharacteristics(fnOutParams.getNumRows(), fnOutParams.getNumCols(), (int)fnOutParams.getRowsInBlock(), (int)fnOutParams.getColsInBlock(), fnOutParams.getNnz()), + new MatrixCharacteristics(fnOutParams.getNumRows(), fnOutParams.getNumCols(), IntUtils.toInt(fnOutParams.getRowsInBlock()), IntUtils.toInt(fnOutParams.getColsInBlock()), fnOutParams.getNnz()), oparams.getUpdateType() ); @@ -2483,8 +2484,8 @@ public class Dag<N extends Lop> String tempVarName = oparams.getLabel() + "temp"; String tempFileName = getNextUniqueFilename(); - int rpb = (int) oparams.getRowsInBlock(); - int cpb = (int) oparams.getColsInBlock(); + int rpb = IntUtils.toInt( oparams.getRowsInBlock() ); + int cpb = IntUtils.toInt( oparams.getColsInBlock() ); Instruction createvarInst = VariableCPInstruction.prepareCreateVariableInstruction( tempVarName, tempFileName, @@ -2539,8 +2540,8 @@ public class Dag<N extends Lop> // create a variable to hold the result produced by this "rootNode" oparams.setLabel("pVar" + var_index.getNextID() ); - int rpb = (int) oparams.getRowsInBlock(); - int cpb = (int) oparams.getColsInBlock(); + int rpb = IntUtils.toInt( oparams.getRowsInBlock() ); + int cpb = IntUtils.toInt( oparams.getColsInBlock() ); Lop fnameLop = ((Data)node).getNamedInputLop(DataExpression.IO_FILENAME); String fnameStr = (fnameLop instanceof Data && ((Data)fnameLop).isLiteral()) ? fnameLop.getOutputParameters().getLabel() http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java b/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java index e8145ac..ea0cb24 100644 --- a/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java +++ b/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java @@ -67,6 +67,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.data.RandomMatrixGenerator; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; @@ -492,7 +493,7 @@ public class RunMRJobs RandInstruction lrand = (RandInstruction)ldgInst; RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator( lrand.getProbabilityDensityFunction(), - (int)lrand.getRows(), (int)lrand.getCols(), + IntUtils.toInt(lrand.getRows()), IntUtils.toInt(lrand.getCols()), lrand.getRowsInBlock(), lrand.getColsInBlock(), lrand.getSparsity(), lrand.getMinValue(), lrand.getMaxValue(), lrand.getPdfParams()); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index a851a4d..c131215 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -111,6 +111,7 @@ import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; import org.apache.sysml.yarn.ropt.YarnClusterAnalyzer; @@ -888,7 +889,7 @@ public class ParForProgramBlock extends ForProgramBlock String resultFile = constructResultFileName(); long numIterations = partitioner.getNumIterations(); - int maxDigits = (int)Math.log10(to.getLongValue()) + 1; + int maxDigits = IntUtils.toInt(Math.log10(to.getLongValue()) + 1); long numCreatedTasks = -1; if( USE_STREAMING_TASK_CREATION ) { LocalTaskQueue<Task> queue = new LocalTaskQueue<>(); @@ -1453,7 +1454,7 @@ public class ParForProgramBlock extends ForProgramBlock int maxNumRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks(); //correction max number of reducers on yarn clusters if( InfrastructureAnalyzer.isYarnEnabled() ) - maxNumRed = (int)Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 ); + maxNumRed = IntUtils.toInt(Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 )); int numRed = Math.min(numReducers,maxNumRed); //create data partitioner @@ -1484,7 +1485,7 @@ public class ParForProgramBlock extends ForProgramBlock //determine degree of parallelism int maxMap = -1, maxRed = -1; if( OptimizerUtils.isSparkExecutionMode() ) { - maxMap = (int) SparkExecutionContext.getDefaultParallelism(true); + maxMap = IntUtils.toInt( SparkExecutionContext.getDefaultParallelism(true)); maxRed = maxMap; //equal map/reduce } else { @@ -1494,8 +1495,8 @@ public class ParForProgramBlock extends ForProgramBlock InfrastructureAnalyzer.getRemoteParallelReduceTasks()); //correction max number of reducers on yarn clusters if( InfrastructureAnalyzer.isYarnEnabled() ) { - maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() ); - maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 ); + maxMap = IntUtils.toInt(Math.max( maxMap, YarnClusterAnalyzer.getNumCores() )); + maxRed = IntUtils.toInt(Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 )); } } int numMap = Math.max(_numThreads, maxMap); @@ -1632,8 +1633,8 @@ public class ParForProgramBlock extends ForProgramBlock int par = Math.min( _resultVars.size(), InfrastructureAnalyzer.getLocalParallelism() ); if( InfrastructureAnalyzer.isLocalMode() ) { - int parmem = (int)Math.floor(OptimizerUtils.getLocalMemBudget() / - InfrastructureAnalyzer.getRemoteMaxMemorySortBuffer()); + int parmem = IntUtils.toInt(Math.floor(OptimizerUtils.getLocalMemBudget() / + InfrastructureAnalyzer.getRemoteMaxMemorySortBuffer())); par = Math.min(par, Math.max(parmem, 1)); //reduce k if necessary } @@ -1749,7 +1750,7 @@ public class ParForProgramBlock extends ForProgramBlock if( _IDPrefix == -1 ) //not specified _ID = _pfIDSeq.getNextID(); //generated new ID else //remote case (further nested parfors are all in one JVM) - _ID = IDHandler.concatIntIDsToLong(_IDPrefix, (int)_pfIDSeq.getNextID()); + _ID = IDHandler.concatIntIDsToLong(_IDPrefix, IntUtils.toInt(_pfIDSeq.getNextID())); } /** @@ -1769,7 +1770,7 @@ public class ParForProgramBlock extends ForProgramBlock if(_IDPrefix == -1) _pwIDs[i] = _pwIDSeq.getNextID(); else - _pwIDs[i] = IDHandler.concatIntIDsToLong(_IDPrefix,(int)_pwIDSeq.getNextID()); + _pwIDs[i] = IDHandler.concatIntIDsToLong(_IDPrefix,IntUtils.toInt(_pwIDSeq.getNextID())); if( _monitor ) StatisticMonitor.putPfPwMapping(_ID, _pwIDs[i]); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java index a87e4b4..db3b073 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java @@ -28,6 +28,7 @@ import java.io.IOException; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.utils.IntUtils; /** * Wrapper for WriteBuffer byte array per matrix/frame in order to @@ -61,9 +62,9 @@ public class ByteBuffer { //deep serialize (for compression) if( CacheableData.CACHING_BUFFER_PAGECACHE ) - _bdata = PageCache.getPage((int)_size); + _bdata = PageCache.getPage(IntUtils.toInt(_size)); if( _bdata==null ) - _bdata = new byte[(int)_size]; + _bdata = new byte[IntUtils.toInt(_size)]; DataOutput dout = new CacheDataOutput(_bdata); cb.write(dout); } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheStatistics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheStatistics.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheStatistics.java index c569787..b8fde2f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheStatistics.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheStatistics.java @@ -86,7 +86,7 @@ public class CacheStatistics _numHitsMem.increment(); } - public static void incrementMemHits(int delta) { + public static void incrementMemHits(long delta) { _numHitsMem.add(delta); } @@ -98,7 +98,7 @@ public class CacheStatistics _numHitsFSBuff.increment(); } - public static void incrementFSBuffHits( int delta ) { + public static void incrementFSBuffHits( long delta ) { _numHitsFSBuff.add(delta); } @@ -110,7 +110,7 @@ public class CacheStatistics _numHitsFS.increment(); } - public static void incrementFSHits(int delta) { + public static void incrementFSHits(long delta) { _numHitsFS.add(delta); } @@ -122,7 +122,7 @@ public class CacheStatistics _numHitsHDFS.increment(); } - public static void incrementHDFSHits(int delta) { + public static void incrementHDFSHits(long delta) { _numHitsHDFS.add(delta); } @@ -134,7 +134,7 @@ public class CacheStatistics _numWritesFSBuff.increment(); } - public static void incrementFSBuffWrites(int delta) { + public static void incrementFSBuffWrites(long delta) { _numWritesFSBuff.add(delta); } @@ -146,7 +146,7 @@ public class CacheStatistics _numWritesFS.increment(); } - public static void incrementFSWrites(int delta) { + public static void incrementFSWrites(long delta) { _numWritesFS.add(delta); } @@ -158,7 +158,7 @@ public class CacheStatistics _numWritesHDFS.increment(); } - public static void incrementHDFSWrites(int delta) { + public static void incrementHDFSWrites(long delta) { _numWritesHDFS.add(delta); } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java index b953906..dca58a6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java @@ -43,6 +43,7 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; public class FrameObject extends CacheableData<FrameBlock> { @@ -107,14 +108,14 @@ public class FrameObject extends CacheableData<FrameBlock> */ public ValueType[] mergeSchemas(FrameObject fo) { return (ValueType[]) ArrayUtils.addAll( - (_schema!=null) ? _schema : UtilFunctions.nCopies((int)getNumColumns(), ValueType.STRING), - (fo._schema!=null) ? fo._schema : UtilFunctions.nCopies((int)fo.getNumColumns(), ValueType.STRING)); + (_schema!=null) ? _schema : UtilFunctions.nCopies(IntUtils.toInt(getNumColumns()), ValueType.STRING), + (fo._schema!=null) ? fo._schema : UtilFunctions.nCopies(IntUtils.toInt(fo.getNumColumns()), ValueType.STRING)); } public void setSchema(String schema) { if( schema.equals("*") ) { //populate default schema - int clen = (int) getNumColumns(); + int clen = IntUtils.toInt( getNumColumns() ); if( clen >= 0 ) //known number of cols _schema = UtilFunctions.nCopies(clen, ValueType.STRING); } @@ -169,7 +170,7 @@ public class FrameObject extends CacheableData<FrameBlock> //handle missing schema if necessary ValueType[] lschema = (_schema!=null) ? _schema : - UtilFunctions.nCopies(clen>=1 ? (int)clen : 1, ValueType.STRING); + UtilFunctions.nCopies(clen>=1 ? IntUtils.toInt(clen) : 1, ValueType.STRING); //read the frame block FrameBlock data = null; @@ -201,12 +202,12 @@ public class FrameObject extends CacheableData<FrameBlock> MetaDataFormat iimd = (MetaDataFormat) _metaData; MatrixCharacteristics mc = iimd.getMatrixCharacteristics(); - int rlen = (int)mc.getRows(); - int clen = (int)mc.getCols(); + int rlen = IntUtils.toInt(mc.getRows()); + int clen = IntUtils.toInt(mc.getCols()); //handle missing schema if necessary ValueType[] lschema = (_schema!=null) ? _schema : - UtilFunctions.nCopies(clen>=1 ? (int)clen : 1, ValueType.STRING); + UtilFunctions.nCopies(clen>=1 ? IntUtils.toInt(clen) : 1, ValueType.STRING); FrameBlock fb = null; try { http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index 4ab0f34..66bd0e3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -43,6 +43,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.utils.IntUtils; /** @@ -313,7 +314,7 @@ public class MatrixObject extends CacheableData<MatrixBlock> mb = readBlobFromHDFS( fname, rows, cols ); else { - mb = new MatrixBlock((int)rows, (int)cols, true); + mb = new MatrixBlock(IntUtils.toInt(rows), IntUtils.toInt(cols), true); LOG.warn("Reading empty matrix partition "+fname); } } @@ -327,13 +328,13 @@ public class MatrixObject extends CacheableData<MatrixBlock> if( _partitionFormat == PDataPartitionFormat.ROW_BLOCK_WISE ) { - int rix = (int)((pred.rowStart-1)%brlen); - mb = mb.slice(rix, rix, (int)(pred.colStart-1), (int)(pred.colEnd-1), new MatrixBlock()); + int rix = IntUtils.toInt((pred.rowStart-1)%brlen); + mb = mb.slice(rix, rix, IntUtils.toInt(pred.colStart-1), IntUtils.toInt(pred.colEnd-1), new MatrixBlock()); } if( _partitionFormat == PDataPartitionFormat.COLUMN_BLOCK_WISE ) { - int cix = (int)((pred.colStart-1)%bclen); - mb = mb.slice((int)(pred.rowStart-1), (int)(pred.rowEnd-1), cix, cix, new MatrixBlock()); + int cix = IntUtils.toInt((pred.colStart-1)%bclen); + mb = mb.slice(IntUtils.toInt(pred.rowStart-1), IntUtils.toInt(pred.rowEnd-1), cix, cix, new MatrixBlock()); } } @@ -467,10 +468,10 @@ public class MatrixObject extends CacheableData<MatrixBlock> } //obtain matrix block from RDD - int rlen = (int)mc.getRows(); - int clen = (int)mc.getCols(); - int brlen = (int)mc.getRowsPerBlock(); - int bclen = (int)mc.getColsPerBlock(); + int rlen = IntUtils.toInt(mc.getRows()); + int clen = IntUtils.toInt(mc.getCols()); + int brlen = mc.getRowsPerBlock(); + int bclen = mc.getColsPerBlock(); long nnz = mc.getNonZerosBound(); //guarded rdd collect http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java index 3e8636b..c0c949f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java @@ -60,6 +60,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.utils.GPUStatistics; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; @@ -288,7 +289,7 @@ public class ExecutionContext { if( oldMetaData == null || !(oldMetaData instanceof MetaDataFormat) ) throw new DMLRuntimeException("Metadata not available"); MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, - (int) mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock()); + IntUtils.toInt(mo.getNumRowsPerBlock()), IntUtils.toInt(mo.getNumColumnsPerBlock())); mo.setMetaData(new MetaDataFormat(mc, ((MetaDataFormat)oldMetaData).getOutputInfo(), ((MetaDataFormat)oldMetaData).getInputInfo())); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 33a61db..8981c87 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -83,6 +83,7 @@ import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.MLContextProxy; import org.apache.sysml.utils.Statistics; @@ -375,7 +376,7 @@ public class SparkExecutionContext extends ExecutionContext } else { //default case MatrixBlock mb = mo.acquireRead(); //pin matrix in memory - rdd = toMatrixJavaPairRDD(sc, mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock(), numParts, inclEmpty); + rdd = toMatrixJavaPairRDD(sc, mb, IntUtils.toInt(mo.getNumRowsPerBlock()), IntUtils.toInt(mo.getNumColumnsPerBlock()), numParts, inclEmpty); mo.release(); //unpin matrix _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true); } @@ -572,8 +573,8 @@ public class SparkExecutionContext extends ExecutionContext CacheableData.addBroadcastSize(-mo.getBroadcastHandle().getPartitionedBroadcastSize()); //obtain meta data for matrix - int brlen = (int) mo.getNumRowsPerBlock(); - int bclen = (int) mo.getNumColumnsPerBlock(); + int brlen = IntUtils.toInt( mo.getNumRowsPerBlock() ); + int bclen = IntUtils.toInt( mo.getNumColumnsPerBlock() ); //create partitioned matrix block and release memory consumed by input MatrixBlock mb = mo.acquireRead(); @@ -582,7 +583,7 @@ public class SparkExecutionContext extends ExecutionContext //determine coarse-grained partitioning int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen); - int numParts = (int) Math.ceil((double) pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() / numPerPart); + int numParts = IntUtils.toInt( Math.ceil((double) pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() / numPerPart) ); Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new Broadcast[numParts]; //create coarse-grained partitioned broadcasts @@ -637,7 +638,7 @@ public class SparkExecutionContext extends ExecutionContext CacheableData.addBroadcastSize(-fo.getBroadcastHandle().getPartitionedBroadcastSize()); //obtain meta data for frame - int bclen = (int) fo.getNumColumns(); + int bclen = IntUtils.toInt( fo.getNumColumns() ); int brlen = OptimizerUtils.getDefaultFrameSize(); //create partitioned frame block and release memory consumed by input @@ -647,7 +648,7 @@ public class SparkExecutionContext extends ExecutionContext //determine coarse-grained partitioning int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), fo.getNumColumns(), brlen, bclen); - int numParts = (int) Math.ceil((double) pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() / numPerPart); + int numParts = IntUtils.toInt( Math.ceil((double) pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() / numPerPart) ); Broadcast<PartitionedBlock<FrameBlock>>[] ret = new Broadcast[numParts]; //create coarse-grained partitioned broadcasts @@ -743,8 +744,8 @@ public class SparkExecutionContext extends ExecutionContext int maxCol = UtilFunctions.computeBlockSize(mc.getCols(), blockCol+1, mc.getColsPerBlock()); //copy sub-matrix to block MatrixBlock block = new MatrixBlock(maxRow, maxCol, mb.isInSparseFormat()); - int row_offset = (int)blockRow*mc.getRowsPerBlock(); - int col_offset = (int)blockCol*mc.getColsPerBlock(); + int row_offset = IntUtils.toInt(blockRow*mc.getRowsPerBlock()); + int col_offset = IntUtils.toInt(blockCol*mc.getColsPerBlock()); block = mb.slice( row_offset, row_offset+maxRow-1, col_offset, col_offset+maxCol-1, block ); //create key-value pair @@ -761,7 +762,7 @@ public class SparkExecutionContext extends ExecutionContext //create and write subblocks of matrix int blksize = ConfigurationManager.getBlocksize(); - for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)blksize); blockRow++) + for(int blockRow = 0; blockRow < IntUtils.toInt(Math.ceil(src.getNumRows()/(double)blksize)); blockRow++) { int maxRow = (blockRow*blksize + blksize < src.getNumRows()) ? blksize : src.getNumRows() - blockRow*blksize; int roffset = blockRow*blksize; @@ -857,8 +858,8 @@ public class SparkExecutionContext extends ExecutionContext MatrixBlock block = keyval._2(); //compute row/column block offsets - int row_offset = (int)(ix.getRowIndex()-1)*brlen; - int col_offset = (int)(ix.getColumnIndex()-1)*bclen; + int row_offset = IntUtils.toInt(ix.getRowIndex()-1)*brlen; + int col_offset = IntUtils.toInt(ix.getColumnIndex()-1)*bclen; int rows = block.getNumRows(); int cols = block.getNumColumns(); @@ -938,7 +939,7 @@ public class SparkExecutionContext extends ExecutionContext //append cell to dense/sparse target in order to avoid shifting for sparse //note: this append requires a final sort of sparse rows - out.appendValue((int)ix.getRowIndex()-1, (int)ix.getColumnIndex()-1, cell.getValue()); + out.appendValue(IntUtils.toInt(ix.getRowIndex()-1), IntUtils.toInt(ix.getColumnIndex()-1), cell.getValue()); } //post-processing output matrix @@ -969,7 +970,7 @@ public class SparkExecutionContext extends ExecutionContext //unpack index-block pair MatrixIndexes ix = keyval._1(); MatrixBlock block = keyval._2(); - out.setBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block); + out.setBlock(IntUtils.toInt(ix.getRowIndex()), IntUtils.toInt(ix.getColumnIndex()), block); } if (ConfigurationManager.isStatistics()) { @@ -1002,7 +1003,7 @@ public class SparkExecutionContext extends ExecutionContext for( Tuple2<Long,FrameBlock> keyval : list ) { //unpack index-block pair - int ix = (int)(keyval._1() - 1); + int ix = IntUtils.toInt(keyval._1() - 1); FrameBlock block = keyval._2(); //copy into output frame @@ -1308,7 +1309,7 @@ public class SparkExecutionContext extends ExecutionContext if( pool < 0 ) { pool = _poolBuff.length; _poolBuff = Arrays.copyOf(_poolBuff, - (int)Math.min(2L*pool, Integer.MAX_VALUE)); + IntUtils.toInt(Math.min(2L*pool, Integer.MAX_VALUE))); } //mark pool name for in use _poolBuff[pool] = true; http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java index df9c925..7659245 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java @@ -30,6 +30,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; import org.apache.sysml.runtime.instructions.cp.ListObject; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; public class LocalPSWorker extends PSWorker implements Callable<Void> { @@ -53,7 +54,7 @@ public class LocalPSWorker extends PSWorker implements Callable<Void> { incWorkerNumber(); try { long dataSize = _features.getNumRows(); - int batchIter = (int) Math.ceil((double) dataSize / _batchSize); + int batchIter = IntUtils.toInt( Math.ceil((double) dataSize / _batchSize) ); switch (_freq) { case BATCH: http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java index 58bf311..b93604f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java @@ -71,6 +71,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; import org.apache.sysml.runtime.util.ProgramConverter; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; import scala.Tuple2; @@ -194,7 +195,7 @@ public class ParamservUtils { * @return new sliced matrix block */ public static MatrixBlock sliceMatrixBlock(MatrixBlock mb, long rl, long rh) { - return mb.slice((int) rl - 1, (int) rh - 1); + return mb.slice(IntUtils.toInt( rl - 1 ), IntUtils.toInt( rh - 1 )); } /** @@ -389,7 +390,7 @@ public class ParamservUtils { JavaPairRDD<MatrixIndexes, MatrixBlock> labelsRDD = (JavaPairRDD<MatrixIndexes, MatrixBlock>) sec.getRDDHandleForMatrixObject(labels, InputInfo.BinaryBlockInputInfo); - DataPartitionerSparkMapper mapper = new DataPartitionerSparkMapper(scheme, workerNum, sec, (int) features.getNumRows()); + DataPartitionerSparkMapper mapper = new DataPartitionerSparkMapper(scheme, workerNum, sec, IntUtils.toInt( features.getNumRows())); JavaPairRDD<Integer, Tuple2<MatrixBlock, MatrixBlock>> result = ParamservUtils .assembleTrainingData(featuresRDD, labelsRDD) // Combine features and labels into a pair (rowBlockID => (features, labels)) .flatMapToPair(mapper) // Do the data partitioning on spark (workerID => (rowBlockID, (single row features, single row labels)) http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRLocalScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRLocalScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRLocalScheme.java index 464be99..b9f37d9 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRLocalScheme.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRLocalScheme.java @@ -29,6 +29,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.utils.IntUtils; /** * Data partitioner Disjoint_Random: @@ -39,7 +40,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; public class DRLocalScheme extends DataPartitionLocalScheme { private List<MatrixBlock> partition(int k, MatrixBlock mb, MatrixBlock permutation) { - int batchSize = (int) Math.ceil((double) mb.getNumRows() / k); + int batchSize = IntUtils.toInt(Math.ceil((double) mb.getNumRows() / k)); return IntStream.range(0, k).mapToObj(i -> { int begin = i * batchSize; int end = Math.min((i + 1) * batchSize, mb.getNumRows()); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRSparkScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRSparkScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRSparkScheme.java index df61af9..3f6deb3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRSparkScheme.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DRSparkScheme.java @@ -26,6 +26,7 @@ import java.util.stream.IntStream; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -58,10 +59,10 @@ public class DRSparkScheme extends DataPartitionSparkScheme { long shiftedPosition = (long) partialPerm.getValue(r, 0); // Get the shifted block and position - int shiftedBlkID = (int) (shiftedPosition / OptimizerUtils.DEFAULT_BLOCKSIZE + 1); + int shiftedBlkID = IntUtils.toInt(shiftedPosition / OptimizerUtils.DEFAULT_BLOCKSIZE + 1); MatrixBlock indicator = _workerIndicator.getBlock(shiftedBlkID, 1); - int workerID = (int) indicator.getValue((int) shiftedPosition / OptimizerUtils.DEFAULT_BLOCKSIZE, 0); + int workerID = IntUtils.toInt( indicator.getValue(IntUtils.toInt( shiftedPosition / OptimizerUtils.DEFAULT_BLOCKSIZE), 0)); return new Tuple2<>(workerID, new Tuple2<>(shiftedPosition, rowMB)); }).collect(Collectors.toList()); } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionSparkScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionSparkScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionSparkScheme.java index 7992ac8..77d252c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionSparkScheme.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionSparkScheme.java @@ -28,6 +28,7 @@ import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -65,7 +66,7 @@ public abstract class DataPartitionSparkScheme implements Serializable { protected List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> nonShuffledPartition(int rblkID, MatrixBlock mb) { MatrixBlock indicator = _workerIndicator.getBlock(rblkID, 1); return LongStream.range(0, mb.getNumRows()).mapToObj(r -> { - int workerID = (int) indicator.getValue((int) r, 0); + int workerID = IntUtils.toInt( indicator.getValue(IntUtils.toInt( r ), 0) ); MatrixBlock rowMB = ParamservUtils.sliceMatrixBlock(mb, r + 1, r + 1); long shiftedPosition = r + (rblkID - 1) * OptimizerUtils.DEFAULT_BLOCKSIZE; return new Tuple2<>(workerID, new Tuple2<>(shiftedPosition, rowMB)); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionerSparkAggregator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionerSparkAggregator.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionerSparkAggregator.java index 0314ccf..6381324 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionerSparkAggregator.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/DataPartitionerSparkAggregator.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import org.apache.spark.api.java.function.PairFunction; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -51,15 +52,15 @@ public class DataPartitionerSparkAggregator implements PairFunction<Tuple2<Integ */ @Override public Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> call(Tuple2<Integer, LinkedList<Tuple2<Long, Tuple2<MatrixBlock, MatrixBlock>>>> input) throws Exception { - MatrixBlock fmb = new MatrixBlock(input._2.size(), (int) _fcol, false); - MatrixBlock lmb = new MatrixBlock(input._2.size(), (int) _lcol, false); + MatrixBlock fmb = new MatrixBlock(input._2.size(), IntUtils.toInt( _fcol ), false); + MatrixBlock lmb = new MatrixBlock(input._2.size(), IntUtils.toInt( _lcol ), false); for (int i = 0; i < input._2.size(); i++) { MatrixBlock tmpFMB = input._2.get(i)._2._1; MatrixBlock tmpLMB = input._2.get(i)._2._2; // Row-wise aggregation - fmb = fmb.leftIndexingOperations(tmpFMB, i, i, 0, (int) _fcol - 1, fmb, MatrixObject.UpdateType.INPLACE_PINNED); - lmb = lmb.leftIndexingOperations(tmpLMB, i, i, 0, (int) _lcol - 1, lmb, MatrixObject.UpdateType.INPLACE_PINNED); + fmb = fmb.leftIndexingOperations(tmpFMB, i, i, 0, IntUtils.toInt( _fcol - 1 ), fmb, MatrixObject.UpdateType.INPLACE_PINNED); + lmb = lmb.leftIndexingOperations(tmpLMB, i, i, 0, IntUtils.toInt( _lcol - 1 ), lmb, MatrixObject.UpdateType.INPLACE_PINNED); } return new Tuple2<>(input._1, new Tuple2<>(fmb, lmb)); } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/SparkDataPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/SparkDataPartitioner.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/SparkDataPartitioner.java index 031150b..5ba1630 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/SparkDataPartitioner.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/dp/SparkDataPartitioner.java @@ -33,6 +33,7 @@ import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.utils.IntUtils; public class SparkDataPartitioner implements Serializable { @@ -74,7 +75,7 @@ public class SparkDataPartitioner implements Serializable { private void createDCIndicator(SparkExecutionContext sec, int numWorkers, int numEntries) { double[] vector = new double[numEntries]; - int batchSize = (int) Math.ceil((double) numEntries / numWorkers); + int batchSize = IntUtils.toInt( Math.ceil((double) numEntries / numWorkers) ); for (int i = 1; i < numWorkers; i++) { int begin = batchSize * i; int end = Math.min(begin + batchSize, numEntries); @@ -90,7 +91,7 @@ public class SparkDataPartitioner implements Serializable { // Create the source-target id vector from the permutation ranging from 1 to number of entries double[] vector = new double[numEntries]; for (int j = 0; j < perm.getDenseBlockValues().length; j++) { - vector[(int) perm.getDenseBlockValues()[j] - 1] = j; + vector[IntUtils.toInt( perm.getDenseBlockValues()[j] - 1)] = j; } MatrixBlock vectorMB = DataConverter.convertToMatrixBlock(vector, true); return sec.getBroadcastForMatrixObject(ParamservUtils.newMatrixObject(vectorMB)); @@ -101,6 +102,6 @@ public class SparkDataPartitioner implements Serializable { public DataPartitionSparkScheme.Result doPartitioning(int numWorkers, MatrixBlock features, MatrixBlock labels, long rowID) { // Set the rowID in order to get the according permutation - return _scheme.doPartitioning(numWorkers, (int) rowID, features, labels); + return _scheme.doPartitioning(numWorkers, IntUtils.toInt( rowID ), features, labels); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/rpc/PSRpcObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/rpc/PSRpcObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/rpc/PSRpcObject.java index 38d80a2..e2dfc92 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/rpc/PSRpcObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/rpc/PSRpcObject.java @@ -33,6 +33,7 @@ import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.instructions.cp.ListObject; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.utils.IntUtils; public abstract class PSRpcObject { @@ -93,7 +94,7 @@ public abstract class PSRpcObject { ((MatrixObject)d).acquireReadAndRelease().getExactSizeOnDisk()).sum(); if( result > Integer.MAX_VALUE ) throw new DMLRuntimeException("Serialized size ("+result+") larger than Integer.MAX_VALUE."); - return (int) result; + return IntUtils.toInt( result ); } private void validateListObject(ListObject lo) { http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java index 3d12949..8007851 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java @@ -32,6 +32,7 @@ import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.utils.IntUtils; /** @@ -149,7 +150,7 @@ public abstract class DataPartitioner //create output matrix object out.setPartitioned( _format, _n ); - MatrixCharacteristics mcNew = new MatrixCharacteristics( rows, cols, (int)brlen, (int)bclen ); + MatrixCharacteristics mcNew = new MatrixCharacteristics( rows, cols, IntUtils.toInt(brlen), IntUtils.toInt(bclen) ); mcNew.setNonZeros( nonZeros ); if( convertBlock2Cell ) ii = InputInfo.BinaryCellInputInfo; @@ -176,11 +177,11 @@ public abstract class DataPartitioner { case ROW_WISE: //default assumption sparse, but reset per input block anyway - tmp = new MatrixBlock( 1, (int)cols, true, (int)(cols*0.1) ); + tmp = new MatrixBlock( 1, IntUtils.toInt(cols), true, IntUtils.toInt(cols*0.1) ); break; case COLUMN_WISE: //default dense because single column alwyas below SKINNY_MATRIX_TURN_POINT - tmp = new MatrixBlock( (int)rows, 1, false ); + tmp = new MatrixBlock( IntUtils.toInt(rows), 1, false ); break; default: //do nothing http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java index ecb5ea0..d4c78aa 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java @@ -56,6 +56,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.utils.IntUtils; /** * Partitions a given matrix into row or column partitions with a two pass-approach. @@ -189,8 +190,8 @@ public class DataPartitionerLocal extends DataPartitioner Thread[] threads = new Thread[len]; for( int i=0;i<len;i++ ) { - int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len); - int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1; + int start = i*IntUtils.toInt(Math.ceil(((double)fnamesPartitions.length)/len)); + int end = (i+1)*IntUtils.toInt(Math.ceil(((double)fnamesPartitions.length)/len)-1); end = Math.min(end, fnamesPartitions.length-1); threads[i] = new Thread(new DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end)); threads[i].start(); @@ -276,8 +277,8 @@ public class DataPartitionerLocal extends DataPartitioner Thread[] threads = new Thread[len]; for( int i=0;i<len;i++ ) { - int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len); - int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1; + int start = i*IntUtils.toInt(Math.ceil(((double)fnamesPartitions.length)/len)); + int end = (i+1)*IntUtils.toInt(Math.ceil(((double)fnamesPartitions.length)/len)-1); end = Math.min(end, fnamesPartitions.length-1); threads[i] = new Thread(new DataPartitionerWorkerBinaryCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end)); threads[i].start(); @@ -358,8 +359,8 @@ public class DataPartitionerLocal extends DataPartitioner Thread[] threads = new Thread[len]; for( int i=0;i<len;i++ ) { - int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len); - int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1; + int start = i*IntUtils.toInt(Math.ceil(((double)fnamesPartitions.length)/len)); + int end = (i+1)*IntUtils.toInt(Math.ceil(((double)fnamesPartitions.length)/len)-1); end = Math.min(end, fnamesPartitions.length-1); threads[i] = new Thread(new DataPartitionerWorkerBinaryBlock(job, fnameNew, fnameStaging, fnamesPartitions, start, end)); threads[i].start(); @@ -462,8 +463,8 @@ public class DataPartitionerLocal extends DataPartitioner Thread[] threads = new Thread[len]; for( int i=0;i<len;i++ ) { - int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len); - int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1; + int start = i*IntUtils.toInt(Math.ceil(((double)fnamesPartitions.length)/len)); + int end = (i+1)*IntUtils.toInt(Math.ceil(((double)fnamesPartitions.length)/len)-1); end = Math.min(end, fnamesPartitions.length-1); threads[i] = new Thread(new DataPartitionerWorkerBinaryCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end)); threads[i].start(); @@ -496,12 +497,12 @@ public class DataPartitionerLocal extends DataPartitioner if( _format == PDataPartitionFormat.ROW_WISE ) { - _reuseBlk.reset( 1, (int)cols, sparse, (int) (cols*sparsity) ); + _reuseBlk.reset( 1, IntUtils.toInt(cols), sparse, IntUtils.toInt(cols*sparsity) ); for( int i=0; i<rows; i++ ) { String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(row_offset+1+i)); String pfname = pdir+"/"+"block_"+(col_offset/bclen+1); - mb.slice(i, i, 0, (int)(cols-1), _reuseBlk); + mb.slice(i, i, 0, IntUtils.toInt(cols-1), _reuseBlk); LocalFileUtils.writeMatrixBlockToLocal(pfname, _reuseBlk); _reuseBlk.reset(); } @@ -515,13 +516,13 @@ public class DataPartitionerLocal extends DataPartitioner else if( _format == PDataPartitionFormat.COLUMN_WISE ) { //create object for reuse - _reuseBlk.reset( (int)rows, 1, false ); + _reuseBlk.reset( IntUtils.toInt(rows), 1, false ); for( int i=0; i<cols; i++ ) { String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(col_offset+1+i)); String pfname = pdir+"/"+"block_"+(row_offset/brlen+1); - mb.slice(0, (int)(rows-1), i, i, _reuseBlk); + mb.slice(0, IntUtils.toInt(rows-1), i, i, _reuseBlk); LocalFileUtils.writeMatrixBlockToLocal(pfname, _reuseBlk); _reuseBlk.reset(); } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java index 8195d91..fff563f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java @@ -38,6 +38,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; import org.apache.sysml.yarn.DMLAppMasterUtils; @@ -149,7 +150,7 @@ public class DataPartitionerRemoteMR extends DataPartitioner default: //do nothing } - job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) ); + job.setNumReduceTasks( IntUtils.toInt(Math.min( _numReducers, reducerGroups)) ); //disable automatic tasks timeouts and speculative task exec http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java index f535b32..84ea0f8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java @@ -41,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.utils.IntUtils; /** * Remote data partitioner mapper implementation that does the actual @@ -335,12 +336,12 @@ public class DataPartitionerRemoteMapper switch( _pdf ) { case ROW_WISE: - _reuseBlk.reset(1, (int)cols, sparse, (int)(cols*sparsity)); + _reuseBlk.reset(1, IntUtils.toInt(cols), sparse, IntUtils.toInt(cols*sparsity)); for( int i=0; i<rows; i++ ) { _longKey.set(row_offset+1+i); _pairKey.setIndexes(1, (col_offset/_bclen+1) ); - value2.slice(i, i, 0, (int)(cols-1), _reuseBlk); + value2.slice(i, i, 0, IntUtils.toInt(cols-1), _reuseBlk); out.collect(_longKey, _pair); _reuseBlk.reset(); } @@ -361,12 +362,12 @@ public class DataPartitionerRemoteMapper out.collect(_longKey, _pair); break; case COLUMN_WISE: - _reuseBlk.reset((int)rows, 1, false); + _reuseBlk.reset(IntUtils.toInt(rows), 1, false); for( int i=0; i<cols; i++ ) { _longKey.set(col_offset+1+i); _pairKey.setIndexes(row_offset/_brlen+1, 1); - value2.slice(0, (int)(rows-1), i, i, _reuseBlk); + value2.slice(0, IntUtils.toInt(rows-1), i, i, _reuseBlk); out.collect(_longKey, _pair ); _reuseBlk.reset(); } @@ -671,9 +672,9 @@ public class DataPartitionerRemoteMapper long numPartitions = -1; switch( _pdf ){ case ROW_WISE: numPartitions = _rlen; break; - case ROW_BLOCK_WISE: numPartitions = (int)Math.ceil(_rlen/(double)_brlen); break; + case ROW_BLOCK_WISE: numPartitions = IntUtils.toInt(Math.ceil(_rlen/(double)_brlen)); break; case COLUMN_WISE: numPartitions = _clen; break; - case COLUMN_BLOCK_WISE: numPartitions = (int)Math.ceil(_clen/(double)_bclen); break; + case COLUMN_BLOCK_WISE: numPartitions = IntUtils.toInt(Math.ceil(_clen/(double)_bclen)); break; default: //do nothing } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java index daab642..165bdd7 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java @@ -34,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; /** @@ -74,7 +75,7 @@ public class DataPartitionerRemoteSpark extends DataPartitioner //determine degree of parallelism MatrixCharacteristics mc = in.getMatrixCharacteristics(); - int numRed = (int)determineNumReducers(inRdd, mc, _numRed); + int numRed = IntUtils.toInt(determineNumReducers(inRdd, mc, _numRed)); //run spark remote data partition job DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, _format, _n); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java index b5f0956..0baf24b 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java @@ -35,6 +35,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -117,7 +118,7 @@ public class DataPartitionerRemoteSparkMapper extends ParWorker implements PairF for( int i=0; i<rows; i+=_n ) { PairWritableBlock tmp = new PairWritableBlock(); tmp.indexes = new MatrixIndexes(1, col_offset/_bclen+1); - tmp.block = value2.slice(i, Math.min(i+(int)_n-1, value2.getNumRows()-1)); + tmp.block = value2.slice(i, Math.min(i+IntUtils.toInt(_n)-1, value2.getNumRows()-1)); ret.add(new Tuple2<Long,Writable>(new Long((row_offset+i)/_n+1),tmp)); } } @@ -152,7 +153,7 @@ public class DataPartitionerRemoteSparkMapper extends ParWorker implements PairF PairWritableBlock tmp = new PairWritableBlock(); tmp.indexes = new MatrixIndexes(row_offset/_brlen+1, 1); tmp.block = value2.slice(0, value2.getNumRows()-1, - i, Math.min(i+(int)_n-1, value2.getNumColumns()-1), new MatrixBlock()); + i, Math.min(i+IntUtils.toInt(_n)-1, value2.getNumColumns()-1), new MatrixBlock()); ret.add(new Tuple2<Long,Writable>(new Long((col_offset+i)/_n+1),tmp)); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java index 5623827..607291e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java @@ -57,6 +57,7 @@ import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.ProgramConverter; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; import org.apache.sysml.yarn.DMLAppMasterUtils; @@ -99,8 +100,8 @@ public class RemoteDPParForMR Path path = new Path( input.getFileName() ); long rlen = input.getNumRows(); long clen = input.getNumColumns(); - int brlen = (int) input.getNumRowsPerBlock(); - int bclen = (int) input.getNumColumnsPerBlock(); + int brlen = IntUtils.toInt( input.getNumRowsPerBlock()); + int bclen = IntUtils.toInt( input.getNumColumnsPerBlock()); MRJobConfiguration.setPartitioningInfo(job, rlen, clen, brlen, bclen, InputInfo.BinaryBlockInputInfo, oi, dpf._dpf, dpf._N, input.getFileName(), itervar, matrixvar, tSparseCol); job.setInputFormat(InputInfo.BinaryBlockInputInfo.inputFormatClass); @@ -172,20 +173,20 @@ public class RemoteDPParForMR // Process different counters Statistics.incrementNoOfExecutedMRJobs(); Group pgroup = runjob.getCounters().getGroup(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME); - int numTasks = (int)pgroup.getCounter( Stat.PARFOR_NUMTASKS.toString() ); - int numIters = (int)pgroup.getCounter( Stat.PARFOR_NUMITERS.toString() ); + int numTasks = IntUtils.toInt(pgroup.getCounter( Stat.PARFOR_NUMTASKS.toString() )); + int numIters = IntUtils.toInt(pgroup.getCounter( Stat.PARFOR_NUMITERS.toString() )); if( ConfigurationManager.isStatistics() && !InfrastructureAnalyzer.isLocalMode() ) { Statistics.incrementJITCompileTime( pgroup.getCounter( Stat.PARFOR_JITCOMPILE.toString() ) ); Statistics.incrementJVMgcCount( pgroup.getCounter( Stat.PARFOR_JVMGC_COUNT.toString() ) ); Statistics.incrementJVMgcTime( pgroup.getCounter( Stat.PARFOR_JVMGC_TIME.toString() ) ); Group cgroup = runjob.getCounters().getGroup(CacheableData.CACHING_COUNTER_GROUP_NAME.toString()); - CacheStatistics.incrementMemHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_MEM.toString() )); - CacheStatistics.incrementFSBuffHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString() )); - CacheStatistics.incrementFSHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_FS.toString() )); - CacheStatistics.incrementHDFSHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_HDFS.toString() )); - CacheStatistics.incrementFSBuffWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString() )); - CacheStatistics.incrementFSWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_FS.toString() )); - CacheStatistics.incrementHDFSWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_HDFS.toString() )); + CacheStatistics.incrementMemHits(cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_MEM.toString() )); + CacheStatistics.incrementFSBuffHits(cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString() )); + CacheStatistics.incrementFSHits(cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_FS.toString() )); + CacheStatistics.incrementHDFSHits(cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_HDFS.toString() )); + CacheStatistics.incrementFSBuffWrites(cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString() )); + CacheStatistics.incrementFSWrites(cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_FS.toString() )); + CacheStatistics.incrementHDFSWrites(cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_HDFS.toString() )); CacheStatistics.incrementAcquireRTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_ACQR.toString() )); CacheStatistics.incrementAcquireMTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_ACQM.toString() )); CacheStatistics.incrementReleaseTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_RLS.toString() )); http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java index 866e456..68eb19a 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java @@ -57,6 +57,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.utils.IntUtils; import org.apache.sysml.utils.Statistics; /** @@ -90,7 +91,7 @@ public class RemoteDPParForSpark //compute number of reducers (to avoid OOMs and reduce memory pressure) int numParts = SparkUtils.getNumPreferredPartitions(mc, in); - int numReducers2 = Math.max(numReducers, Math.min(numParts, (int)dpf.getNumParts(mc))); + int numReducers2 = Math.max(numReducers, Math.min(numParts, IntUtils.toInt(dpf.getNumParts(mc)))); //core parfor datapartition-execute (w/ or w/o shuffle, depending on data characteristics) RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, clsMap, @@ -223,7 +224,7 @@ public class RemoteDPParForSpark int off = _containsID ? 1: 0; Object obj = _isVector ? arg0._1().get(off) : arg0._1(); boolean sparse = (obj instanceof SparseVector); - MatrixBlock mb = new MatrixBlock(1, (int)_clen, sparse); + MatrixBlock mb = new MatrixBlock(1, IntUtils.toInt(_clen), sparse); if( _isVector ) { Vector vect = (Vector) obj; http://git-wip-us.apache.org/repos/asf/systemml/blob/95cbbd65/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 66b4283..aa3b2a0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -44,6 +44,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.ProgramConverter; +import org.apache.sysml.utils.IntUtils; import scala.Tuple2; @@ -84,8 +85,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF _aIters = aiters; //setup matrix block partition meta data - _rlen = (int)dpf.getNumRows(mc); - _clen = (int)dpf.getNumColumns(mc); + _rlen = IntUtils.toInt(dpf.getNumRows(mc)); + _clen = IntUtils.toInt(dpf.getNumColumns(mc)); _brlen = mc.getRowsPerBlock(); _bclen = mc.getColsPerBlock(); _tSparseCol = tSparseCol; @@ -125,7 +126,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF //maintain accumulators _aTasks.add( 1 ); - _aIters.add( (int)(getExecutedIterations()-numIter) ); + _aIters.add( IntUtils.toInt(getExecutedIterations()-numIter) ); } //write output if required (matrix indexed write) @@ -143,7 +144,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF CodegenUtils.getClassSync(e.getKey(), e.getValue()); //parse and setup parfor body program - ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID, true); + ParForBody body = ProgramConverter.parseParForBody(_prog, IntUtils.toInt(_workerID), true); _childBlocks = body.getChildBlocks(); _ec = body.getEc(); _resultVars = body.getResultVariables(); @@ -199,8 +200,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF long lnnz = 0; for( Writable val : valueList ) { PairWritableBlock pval = (PairWritableBlock) val; - int row_offset = (int)(pval.indexes.getRowIndex()-1)*_brlen; - int col_offset = (int)(pval.indexes.getColumnIndex()-1)*_bclen; + int row_offset = IntUtils.toInt(pval.indexes.getRowIndex()-1)*_brlen; + int col_offset = IntUtils.toInt(pval.indexes.getColumnIndex()-1)*_bclen; if( !partition.isInSparseFormat() ) //DENSE partition.copy( row_offset, row_offset+pval.block.getNumRows()-1, col_offset, col_offset+pval.block.getNumColumns()-1, @@ -254,7 +255,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF PairWritableCell pairValue = (PairWritableCell)valueList.iterator().next(); if( pairValue.indexes.getColumnIndex()<0 ) continue; //cells used to ensure empty partitions - partition.quickSetValue(0, (int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue()); + partition.quickSetValue(0, IntUtils.toInt(pairValue.indexes.getColumnIndex()-1), pairValue.cell.getValue()); } break; case COLUMN_WISE: @@ -264,9 +265,9 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF if( pairValue.indexes.getRowIndex()<0 ) continue; //cells used to ensure empty partitions if( _tSparseCol ) - partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, pairValue.cell.getValue()); + partition.appendValue(0,IntUtils.toInt(pairValue.indexes.getRowIndex()-1), pairValue.cell.getValue()); else - partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, pairValue.cell.getValue()); + partition.quickSetValue(IntUtils.toInt(pairValue.indexes.getRowIndex()-1), 0, pairValue.cell.getValue()); } break; default: