This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 84f2135  [MINOR] Fix metadata on spark binary write w/ non-default 
blocksizes
84f2135 is described below

commit 84f21358b6ea677ca2406d800a9995e64077f435
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Jul 16 17:14:37 2021 +0200

    [MINOR] Fix metadata on spark binary write w/ non-default blocksizes
    
    This patch fixes issues of incorrectly written metadata files (blocksize
    of binary block matrices) when invoked through spark write instructions.
---
 .../instructions/spark/RandSPInstruction.java      |  2 +-
 .../instructions/spark/WriteSPInstruction.java     | 32 +++++++++++-----------
 .../org/apache/sysds/test/AutomatedTestBase.java   |  6 ++++
 .../test/functions/io/binary/BlocksizeTest.java    |  6 ++--
 4 files changed, 26 insertions(+), 20 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
index 421b365..565bf88 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
@@ -754,7 +754,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                
                // Construct BinaryBlock representation
                JavaPairRDD<MatrixIndexes, MatrixBlock> mbRDD = 
-                               
RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), miRDD, mcOut, 
true);
+                       
RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), miRDD, mcOut, 
true);
                
                //step 5: output handling, incl meta data
                sec.getDataCharacteristics(output.getName()).set(mcOut);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
index 3acab8b..bad0d2f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
@@ -178,9 +178,9 @@ public class WriteSPInstruction extends SPInstruction 
implements LineageTraceabl
                //get input rdd
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );
                DataCharacteristics mc = 
sec.getDataCharacteristics(input1.getName());
-
-               if( fmt == FileFormat.MM || fmt == FileFormat.TEXT )
-               {
+               DataCharacteristics mcOut = mc; //by reference
+               
+               if( fmt == FileFormat.MM || fmt == FileFormat.TEXT ) {
                        //piggyback nnz maintenance on write
                        LongAccumulator aNnz = null;
                        if( !mc.nnzKnown() ) {
@@ -208,16 +208,14 @@ public class WriteSPInstruction extends SPInstruction 
implements LineageTraceabl
                        if( !mc.nnzKnown() )
                                mc.setNonZeros( aNnz.value() );
                }
-               else if( fmt == FileFormat.CSV )
-               {
+               else if( fmt == FileFormat.CSV ) {
                        if( mc.getRows() == 0 || mc.getCols() == 0 ) {
                                throw new IOException("Write of matrices with 
zero rows or columns"
                                        + " not supported 
("+mc.getRows()+"x"+mc.getCols()+").");
                        }
 
-                       LongAccumulator aNnz = null;
-
                        //piggyback nnz computation on actual write
+                       LongAccumulator aNnz = null;
                        if( !mc.nnzKnown() ) {
                                aNnz = 
sec.getSparkContext().sc().longAccumulator("nnz");
                                in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
@@ -234,9 +232,10 @@ public class WriteSPInstruction extends SPInstruction 
implements LineageTraceabl
                else if( fmt == FileFormat.BINARY ) {
                        //reblock output if needed
                        int blen = Integer.parseInt(input4.getName());
-                       DataCharacteristics mcOut = new 
MatrixCharacteristics(mc).setBlocksize(blen);
-                       if( ConfigurationManager.getBlocksize() != blen )
-                               in1 = 
RDDConverterUtils.binaryBlockToBinaryBlock(in1, mc, mcOut);
+                       boolean nonDefaultBlen = 
ConfigurationManager.getBlocksize() != blen;
+                       if( nonDefaultBlen )
+                               in1 = 
RDDConverterUtils.binaryBlockToBinaryBlock(in1, mc,
+                                       new 
MatrixCharacteristics(mc).setBlocksize(blen));
                        
                        //piggyback nnz computation on actual write
                        LongAccumulator aNnz = null;
@@ -248,8 +247,10 @@ public class WriteSPInstruction extends SPInstruction 
implements LineageTraceabl
                        //save binary block rdd on hdfs
                        in1.saveAsHadoopFile(fname, MatrixIndexes.class, 
MatrixBlock.class, SequenceFileOutputFormat.class);
 
-                       if(!mc.nnzKnown())
+                       if( !mc.nnzKnown() ) //update nnz
                                mc.setNonZeros(aNnz.value().longValue());
+                       if( nonDefaultBlen )
+                               mcOut = new 
MatrixCharacteristics(mc).setBlocksize(blen);
                }
                else if(fmt == FileFormat.LIBSVM) {
                        if(mc.getRows() == 0 || mc.getCols() == 0) {
@@ -257,17 +258,16 @@ public class WriteSPInstruction extends SPInstruction 
implements LineageTraceabl
                                        "Write of matrices with zero rows or 
columns" + " not supported (" + mc.getRows() + "x" + mc
                                        .getCols() + ").");
                        }
-
-                       LongAccumulator aNnz = null;
-
+                       
                        //piggyback nnz computation on actual write
+                       LongAccumulator aNnz = null;
                        if(!mc.nnzKnown()) {
                                aNnz = 
sec.getSparkContext().sc().longAccumulator("nnz");
                                in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
                        }
 
                        JavaRDD<String> out = 
RDDConverterUtils.binaryBlockToLibsvm(in1, 
-                                       mc, (FileFormatPropertiesLIBSVM) 
formatProperties, true);
+                               mc, (FileFormatPropertiesLIBSVM) 
formatProperties, true);
 
                        customSaveTextFile(out, fname, false);
 
@@ -280,7 +280,7 @@ public class WriteSPInstruction extends SPInstruction 
implements LineageTraceabl
                }
 
                // write meta data file
-               HDFSTool.writeMetaDataFile (fname + ".mtd", ValueType.FP64, mc, 
fmt, formatProperties);
+               HDFSTool.writeMetaDataFile(fname + ".mtd", ValueType.FP64, 
mcOut, fmt, formatProperties);
        }
 
        protected void processFrameWriteInstruction(SparkExecutionContext sec, 
String fname, FileFormat fmt, ValueType[] schema)
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java 
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 51c0d2a..a4da6e3 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -900,9 +900,15 @@ public abstract class AutomatedTestBase {
        }
 
        public static void checkDMLMetaDataFile(String fileName, 
MatrixCharacteristics mc) {
+               checkDMLMetaDataFile(fileName, mc, false);
+       }
+       
+       public static void checkDMLMetaDataFile(String fileName, 
MatrixCharacteristics mc, boolean checkBlocksize) {
                MatrixCharacteristics rmc = readDMLMetaDataFile(fileName);
                Assert.assertEquals(mc.getRows(), rmc.getRows());
                Assert.assertEquals(mc.getCols(), rmc.getCols());
+               if( checkBlocksize )
+                       Assert.assertEquals(mc.getBlocksize(), 
rmc.getBlocksize());
        }
 
        public static MatrixCharacteristics readDMLMetaDataFile(String 
fileName) {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java 
b/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
index 571f817..8ba7e48 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
@@ -129,16 +129,16 @@ public class BlocksizeTest extends AutomatedTestBase
                        //generate actual dataset 
                        double[][] X = getRandomMatrix(rows, cols, -1.0, 1.0, 
sparsity, 7); 
                        MatrixBlock mb = DataConverter.convertToMatrixBlock(X);
-                       MatrixCharacteristics mc = new 
MatrixCharacteristics(rows, cols, inBlksize, inBlksize);
+                       MatrixCharacteristics mc = new 
MatrixCharacteristics(rows, cols, inBlksize);
                        DataConverter.writeMatrixToHDFS(mb, input("X"), 
FileFormat.BINARY, mc);
                        HDFSTool.writeMetaDataFile(input("X.mtd"), 
ValueType.FP64, mc, FileFormat.BINARY);
                        
                        runTest(true, false, null, -1); //mult 7
                        
                        //compare matrices 
-                       checkDMLMetaDataFile("X", new 
MatrixCharacteristics(rows, cols, outBlksize, outBlksize));
+                       checkDMLMetaDataFile("X", new 
MatrixCharacteristics(rows, cols, outBlksize), true);
                        MatrixBlock mb2 = DataConverter.readMatrixFromHDFS(
-                               output("X"), FileFormat.BINARY, rows, cols, 
outBlksize, outBlksize);
+                               output("X"), FileFormat.BINARY, rows, cols, 
outBlksize, -1);
                        for( int i=0; i<mb.getNumRows(); i++ )
                                for( int j=0; j<mb.getNumColumns(); j++ ) {
                                        double val1 = mb.quickGetValue(i, j) * 
7;

Reply via email to