This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e85843  [SYSTEMDS-2868] Fix spark transformencode (tokens with spaces)
6e85843 is described below

commit 6e858434fdd6715925999e41c02a248fe988387d
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Feb 20 22:36:26 2021 +0100

    [SYSTEMDS-2868] Fix spark transformencode (tokens with spaces)
    
    The spark transformencode reuses the textcell framereader to read the
    meta data in consistent form after the parallel pre-pass for determining
    recode maps, bin boundaries, mv values for all columns. The textcell
    readers however are derived from matrix market which does not support
    quoting. Therefore, the spark transformencode fails on tokens that
    contain spaces which are the delimiters for textcell.
    
    We now sanitize the tokens before central consolidation, and
    subsequently desanitize the tokens into their original representations.
    The temporary replacement is a string that is very unlikely to appear in
    practice.
    
    To facilitate this fix, the patch also contains a general mapInplace
    frame operation that accepts arbitrary string-string lambda functions to
    process in-place on the existing frame.
---
 ...ltiReturnParameterizedBuiltinSPInstruction.java |  9 ++++---
 .../sysds/runtime/io/FrameReaderTextCell.java      |  5 ++--
 .../sysds/runtime/matrix/data/FrameBlock.java      | 10 ++++++++
 .../apache/sysds/runtime/transform/TfUtils.java    | 29 +++++++++++++---------
 .../transform/TransformFrameEncodeDecodeTest.java  | 10 ++++----
 .../functions/transform/input/homes3/homes.csv     |  6 ++---
 6 files changed, 44 insertions(+), 25 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index bb290fc..e797eee 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -50,6 +50,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.transform.TfUtils;
 import org.apache.sysds.runtime.transform.encode.Encoder;
 import org.apache.sysds.runtime.transform.encode.EncoderBin;
 import org.apache.sysds.runtime.transform.encode.EncoderComposite;
@@ -138,6 +139,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        FrameBlock meta = 
reader.readFrameFromHDFS(fometa.getFileName(), accMax.value(), 
fo.getNumColumns());
                        meta.recomputeColumnCardinality(); //recompute num 
distinct items per column
                        
meta.setColumnNames((colnames!=null)?colnames:meta.getColumnNames());
+                       meta.mapInplace(v -> TfUtils.desanitizeSpaces(v)); 
//due to format TEXT
                        
                        //step 2: transform apply (similar to spark 
transformapply)
                        //compute omit offset map for block shifts
@@ -326,8 +328,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        //handle recode maps
                        if( _encoder.isEncoder(colID, EncoderRecode.class) ) {
                                while( iter.hasNext() ) {
+                                       String token = 
TfUtils.sanitizeSpaces(iter.next().toString());
                                        sb.append(rowID).append(' 
').append(scolID).append(' ');
-                                       
sb.append(EncoderRecode.constructRecodeMapEntry(iter.next().toString(), rowID));
+                                       
sb.append(EncoderRecode.constructRecodeMapEntry(token, rowID));
                                        ret.add(sb.toString());
                                        sb.setLength(0); 
                                        rowID++;
@@ -440,7 +443,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                                mode = e.getKey();
                                                max = e.getValue();
                                        }
-                               ret.add("-2 " + colix + " " + mode);
+                               ret.add("-2 " + colix + " " + 
TfUtils.sanitizeSpaces(mode));
                        }
                        //compute global mean of categorical feature
                        else if( _encoder.getMethod(colix) == 
MVMethod.GLOBAL_MEAN ) {
@@ -458,7 +461,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        //pass-through constant label
                        else if( _encoder.getMethod(colix) == MVMethod.CONSTANT 
) {
                                if( iter.hasNext() )
-                                       ret.add("-2 " + colix + " " + 
iter.next().getMvValue());
+                                       ret.add("-2 " + colix + " " + 
TfUtils.sanitizeSpaces(iter.next().getMvValue()));
                        }
                        
                        return ret.iterator();
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCell.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCell.java
index 8db958d..bd65220 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCell.java
@@ -38,6 +38,7 @@ import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.transform.TfUtils;
 import org.apache.sysds.runtime.util.FastStringTokenizer;
 import org.apache.sysds.runtime.util.UtilFunctions;
 
@@ -125,7 +126,7 @@ public class FrameReaderTextCell extends FrameReader
                                row = st.nextInt()-1;
                                col = st.nextInt()-1;
                                if( row == -3 )
-                                       
dest.getColumnMetadata(col).setMvValue(st.nextToken());
+                                       
dest.getColumnMetadata(col).setMvValue(TfUtils.desanitizeSpaces(st.nextToken()));
                                else if( row == -2 )
                                        
dest.getColumnMetadata(col).setNumDistinct(st.nextLong());
                                else
@@ -172,7 +173,7 @@ public class FrameReaderTextCell extends FrameReader
                                row = st.nextInt()-1;
                                col = st.nextInt()-1;
                                if( row == -3 )
-                                       
dest.getColumnMetadata(col).setMvValue(st.nextToken());
+                                       
dest.getColumnMetadata(col).setMvValue(TfUtils.desanitizeSpaces(st.nextToken()));
                                else if (row == -2)
                                        
dest.getColumnMetadata(col).setNumDistinct(st.nextLong());
                                else
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index d82d40b..cf17980 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
@@ -2107,6 +2108,15 @@ public class FrameBlock implements CacheBlock, 
Externalizable  {
                mergedFrame.appendRow(rowTemp1);
                return mergedFrame;
        }
+       
+       public void mapInplace(Function<String, String> fun) {
+               for(int j=0; j<getNumColumns(); j++)
+                       for(int i=0; i<getNumRows(); i++) {
+                               Object tmp = get(i, j);
+                               set(i, j, (tmp == null) ? tmp :
+                                       
UtilFunctions.objectToObject(_schema[j], fun.apply(tmp.toString())));
+                       }
+       }
 
        public FrameBlock map (String lambdaExpr){
                if(!lambdaExpr.contains("->")) {
diff --git a/src/main/java/org/apache/sysds/runtime/transform/TfUtils.java 
b/src/main/java/org/apache/sysds/runtime/transform/TfUtils.java
index 635d730..d895fd1 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/TfUtils.java
@@ -20,7 +20,6 @@
 package org.apache.sysds.runtime.transform;
 
 import java.io.Serializable;
-import java.util.regex.Pattern;
 
 import org.apache.sysds.lops.Lop;
 
@@ -72,19 +71,12 @@ public class TfUtils implements Serializable
        public static final String JSON_CONSTS = "constants";
        public static final String JSON_NBINS  = "numbins";
 
-       private String _headerLine = null;
-       private boolean _hasHeader;
-       private Pattern _delim = null;
-       private String _delimString = null;
+       public static final String EXT_SPACE = Lop.DATATYPE_PREFIX 
+               + Lop.INSTRUCTION_DELIMITOR +Lop.DATATYPE_PREFIX;
+       
        private String[] _NAstrings = null;
-       private int _numInputCols = -1;
        
-       public String getHeader()               { return _headerLine; }
-       public boolean hasHeader()              { return _hasHeader; }
-       public String getDelimString()  { return _delimString; }
-       public Pattern getDelim()               { return _delim; }
-       public String[] getNAStrings()  { return _NAstrings; }
-       public long getNumCols()                { return _numInputCols; }
+       public String[] getNAStrings() { return _NAstrings; }
        
        /**
         * Function that checks if the given string is one of NA strings.
@@ -103,4 +95,17 @@ public class TfUtils implements Serializable
                }
                return false;
        }
+       
+       public static String sanitizeSpaces(String token) {
+               //due to the use of textcell (derived from matrix market), we 
cannot
+               //simply quote the token as its parsed without support for 
quoting.
+               //Hence, we replace with a very unlikely character sequence
+               return token != null && token.contains(" ") ?
+                       token.replaceAll(" ", EXT_SPACE): token;
+       }
+       
+       public static String desanitizeSpaces(String token) {
+               return token != null && token.contains(EXT_SPACE) ?
+                       token.replaceAll(EXT_SPACE, " ") : token;
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeDecodeTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeDecodeTest.java
index a6f98f9..6f38ad9 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeDecodeTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeDecodeTest.java
@@ -41,11 +41,11 @@ public class TransformFrameEncodeDecodeTest extends 
AutomatedTestBase
        private final static String TEST_CLASS_DIR = TEST_DIR + 
TransformFrameEncodeDecodeTest.class.getSimpleName() + "/";
        
        //dataset and transform tasks without missing values
-       private final static String DATASET1    = "homes3/homes.csv";
-       private final static String SPEC1               = 
"homes3/homes.tfspec_recode.json"; 
-       private final static String SPEC1b              = 
"homes3/homes.tfspec_recode2.json"; 
-       private final static String SPEC2               = 
"homes3/homes.tfspec_dummy.json";
-       private final static String SPEC2b              = 
"homes3/homes.tfspec_dummy2.json";
+       private final static String DATASET1 = "homes3/homes.csv";
+       private final static String SPEC1    = 
"homes3/homes.tfspec_recode.json"; 
+       private final static String SPEC1b   = 
"homes3/homes.tfspec_recode2.json"; 
+       private final static String SPEC2    = "homes3/homes.tfspec_dummy.json";
+       private final static String SPEC2b   = 
"homes3/homes.tfspec_dummy2.json";
        
        public enum TransformType {
                RECODE,
diff --git a/src/test/scripts/functions/transform/input/homes3/homes.csv 
b/src/test/scripts/functions/transform/input/homes3/homes.csv
index aeed601..880f76c 100644
--- a/src/test/scripts/functions/transform/input/homes3/homes.csv
+++ b/src/test/scripts/functions/transform/input/homes3/homes.csv
@@ -68,13 +68,13 @@ 
zipcode,district,sqft,numbedrooms,numbathrooms,floors,view,saleprice,askingprice
 95141,east,3903,1,2.5,2,FALSE,976,981
 91312,south,1076,2,2.5,1,FALSE,597,600
 96334,west,1719,1,1.5,3,FALSE,738,742
-94555,north,1439,4,1.5,1,FALSE,589,592
+94555,north east,1439,4,1.5,1,FALSE,589,592
 91312,east,1961,2,3,1,TRUE,775,778
 94555,north,2471,1,1.5,1,TRUE,753,756
 91312,west,3930,4,2.5,2,FALSE,1004,1009
 95141,south,2833,1,1,1,FALSE,718,721
 96334,south,2580,4,1,2,TRUE,816,820
-94555,south,2169,3,2.5,3,TRUE,904,908
+94555,south east ,2169,3,2.5,3,TRUE,904,908
 95141,east,3329,4,3,3,TRUE,1064,1069
 96334,south,3392,4,2,3,TRUE,1026,1031
 96334,east,3688,6,2.5,3,FALSE,1032,1037
@@ -85,7 +85,7 @@ 
zipcode,district,sqft,numbedrooms,numbathrooms,floors,view,saleprice,askingprice
 96334,east,1732,3,2,1,TRUE,700,703
 96334,south,2188,4,2,1,TRUE,767,771
 96334,south,3750,6,2,2,FALSE,963,967
-98755,north,2331,1,1.5,1,TRUE,740,743
+98755,north east ,2331,1,1.5,1,TRUE,740,743
 94555,north,1512,4,3,3,TRUE,854,858
 98755,north,3352,3,3,3,FALSE,1014,1018
 94555,south,3426,3,2.5,2,FALSE,937,941

Reply via email to