This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a15d4  [SYSTEMDS-3102] Performance in-memory reblocks for binary 
inputs
37a15d4 is described below

commit 37a15d4620f669724d3927a3b7d71cbc1a6a1a18
Author: Matthias Boehm <[email protected]>
AuthorDate: Wed Aug 25 23:43:24 2021 +0200

    [SYSTEMDS-3102] Performance in-memory reblocks for binary inputs
    
    This patch makes two performance improvements to in-memory reblocks
    inside sp_rblk by preferring in-memory reblock for binary inputs (where
    the read is much faster than distributed reblocking), and leveraging
    similar to rand the lineage items to avoid cache pollution and
    unnecessary evictions.
---
 .../org/apache/sysds/hops/recompile/Recompiler.java  | 13 ++++++++++---
 .../instructions/spark/CSVReblockSPInstruction.java  |  3 ++-
 .../instructions/spark/ReblockSPInstruction.java     | 20 ++++++++++++++++++--
 .../sysds/runtime/lineage/LineageRecomputeUtils.java | 17 +++++++++++++++++
 4 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index 8bb842e..d644bf0 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -93,6 +93,7 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -1578,7 +1579,7 @@ public class Recompiler {
                                throw new DMLRuntimeException(ex);
                        }
                }
-               
+
                //check valid dimensions and memory requirements
                double sp = OptimizerUtils.getSparsity(rows, cols, nnz);
                double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp);
@@ -1593,7 +1594,8 @@ public class Recompiler {
                long estFilesize = (long)(3.5 * mem); //conservative estimate
                long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE * 
                        OptimizerUtils.getParallelTextReadParallelism();
-               return (estFilesize < cpThreshold);
+               return (iimd.getFileFormat() == FileFormat.BINARY
+                       || estFilesize < cpThreshold); //for text conservative
        }
        
        public static boolean checkCPCheckpoint(DataCharacteristics dc) {
@@ -1601,9 +1603,13 @@ public class Recompiler {
                        && OptimizerUtils.isValidCPDimensions(dc.getRows(), 
dc.getCols())
                        && 
!OptimizerUtils.exceedsCachingThreshold(dc.getCols(), 
OptimizerUtils.estimateSize(dc));
        }
+
+       public static void executeInMemoryReblock(ExecutionContext ec, String 
varin, String varout) {
+               executeInMemoryReblock(ec, varin, varout, null);
+       }
        
        @SuppressWarnings("unchecked")
-       public static void executeInMemoryReblock(ExecutionContext ec, String 
varin, String varout) {
+       public static void executeInMemoryReblock(ExecutionContext ec, String 
varin, String varout, LineageItem litem) {
                CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) 
ec.getCacheableData(varin);
                CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) 
ec.getCacheableData(varout);
 
@@ -1618,6 +1624,7 @@ public class Recompiler {
                        
                        //set output (incl update matrix characteristics)
                        out.acquireModify(mb);
+                       out.setCacheLineage(litem);
                        out.release();
                        in.release();
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
index f0c81f4..cae0fbe 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -119,8 +119,9 @@ public class CSVReblockSPInstruction extends 
UnarySPInstruction {
 
                //check for in-memory reblock (w/ lazy spark context, potential 
for latency reduction)
                if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-                       if( input1.getDataType().isMatrix() || 
input1.getDataType().isFrame() )
+                       if( input1.getDataType().isMatrix() || 
input1.getDataType().isFrame() ) {
                                Recompiler.executeInMemoryReblock(sec, 
input1.getName(), output.getName());
+                       }
                        Statistics.decrementNoOfExecutedSPInst();
                        return;
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
index 81b50b8..b8192a9 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.instructions.spark;
 
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -41,6 +42,7 @@ import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM;
 import org.apache.sysds.runtime.io.FileFormatPropertiesMM;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixCell;
@@ -48,6 +50,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
+import org.apache.sysds.runtime.util.ProgramConverter;
 import org.apache.sysds.utils.Statistics;
 
 public class ReblockSPInstruction extends UnarySPInstruction {
@@ -96,8 +99,10 @@ public class ReblockSPInstruction extends UnarySPInstruction 
{
 
                //check for in-memory reblock (w/ lazy spark context, potential 
for latency reduction)
                if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-                       if( input1.getDataType().isMatrix() || 
input1.getDataType().isFrame() )
-                               Recompiler.executeInMemoryReblock(sec, 
input1.getName(), output.getName());
+                       if( input1.getDataType().isMatrix() || 
input1.getDataType().isFrame() ) {
+                               Recompiler.executeInMemoryReblock(sec, 
input1.getName(), output.getName(),
+                                       iimd.getFileFormat()==FileFormat.BINARY 
? getLineageItem(ec).getValue() : null);
+                       }
                        Statistics.decrementNoOfExecutedSPInst();
                        return;
                }
@@ -256,4 +261,15 @@ public class ReblockSPInstruction extends 
UnarySPInstruction {
                                + "for ReblockSPInstruction: " + 
fmt.toString());
                }
        }
+       
+       @Override
+       public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+               //construct reblock lineage without existing createvar lineage
+               if( ec.getLineage() == null ) {
+                       return Pair.of(output.getName(), new LineageItem(
+                               
ProgramConverter.serializeDataObject(input1.getName(), 
ec.getCacheableData(input1)), "cache_rblk"));
+               }
+               //default reblock w/ active lineage tracing
+               return super.getLineageItem(ec);
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
index fe31b38..c51b1c1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.OpOp1;
 import org.apache.sysds.common.Types.OpOp2;
 import org.apache.sysds.common.Types.OpOp3;
@@ -58,6 +59,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
 import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
 import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysds.runtime.instructions.Instruction;
@@ -71,6 +73,7 @@ import 
org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysds.runtime.instructions.spark.RandSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType;
+import org.apache.sysds.runtime.util.ProgramConverter;
 import org.apache.sysds.utils.Explain;
 import org.apache.sysds.utils.Explain.ExplainCounts;
 import org.apache.sysds.utils.Statistics;
@@ -200,6 +203,20 @@ public class LineageRecomputeUtils {
                                        operands.put(item.getId(), input); // 
order preserving
                                        break;
                                }
+                               else if( item.getOpcode().equals("cache_rblk") 
) {
+                                       CacheableData<?> dat = 
(CacheableData<?>)ProgramConverter.parseDataObject(item.getData())[1];
+                                       DataOp hop = new DataOp("tmp", 
dat.getDataType(), dat.getValueType(),
+                                               OpOpData.PERSISTENTREAD, 
dat.getFileName(), dat.getNumRows(),
+                                               dat.getNumColumns(), 
dat.getDataCharacteristics().getNonZeros(), -1);
+                                       hop.setFileFormat(FileFormat.BINARY);
+                                       
hop.setInputBlocksize(dat.getBlocksize());
+                                       
hop.setBlocksize(ConfigurationManager.getBlocksize());
+                                       hop.setRequiresReblock(true);
+                                       operands.put(item.getId(), hop);
+                                       break;
+                               }
+                               
+                               
                                Instruction inst = 
InstructionParser.parseSingleInstruction(item.getData());
                                
                                if (inst instanceof DataGenCPInstruction) {

Reply via email to