This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new cacee11  [SYSTEMDS-2616] Parallel detect schema
cacee11 is described below

commit cacee1131458c97927a23242e9760c038c0e5ddb
Author: baunsgaard <[email protected]>
AuthorDate: Mon Aug 17 13:30:55 2020 +0200

    [SYSTEMDS-2616] Parallel detect schema
    
    Changes the implementation of detect schema to run in
    parallel across all columns.
    
    Closes #1012
---
 .../cp/BinaryFrameFrameCPInstruction.java          |   2 +-
 .../spark/BinaryFrameFrameSPInstruction.java       |   2 +-
 .../sysds/runtime/matrix/data/FrameBlock.java      | 164 +++++++++++++--------
 .../functions/frame/FrameDropInvalidTypeTest.java  |  30 +++-
 4 files changed, 129 insertions(+), 69 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
index 7968b18..8bc8744 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
@@ -39,7 +39,7 @@ public class BinaryFrameFrameCPInstruction extends 
BinaryCPInstruction
                
                if(getOpcode().equals("dropInvalidType")) {
                        // Perform computation using input frames, and produce 
the result frame
-                       FrameBlock retBlock = inBlock1.dropInvalid(inBlock2);
+                       FrameBlock retBlock = 
inBlock1.dropInvalidType(inBlock2);
                        // Attach result frame with FrameBlock associated with 
output_name
                        ec.setFrameOutput(output.getName(), retBlock);
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
index 82ca398..6966178 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
@@ -95,7 +95,7 @@ public class BinaryFrameFrameSPInstruction extends 
BinarySPInstruction {
 
                @Override
                public FrameBlock call(FrameBlock arg0) throws Exception {
-                       return arg0.dropInvalid(schema_frame);
+                       return arg0.dropInvalidType(schema_frame);
                }
        }
 
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 9605380..8a094d0 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -32,10 +32,17 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.sysds.api.DMLException;
 import org.apache.sysds.common.Types.ValueType;
@@ -46,12 +53,14 @@ import org.apache.sysds.runtime.instructions.cp.*;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.runtime.util.IndexRange;
 import org.apache.sysds.runtime.util.UtilFunctions;
 
 @SuppressWarnings({"rawtypes","unchecked"}) //allow generic native arrays
-public class FrameBlock implements CacheBlock, Externalizable  
-{
+public class FrameBlock implements CacheBlock, Externalizable  {
+       private static final Log LOG = 
LogFactory.getLog(FrameBlock.class.getName());
+       
        private static final long serialVersionUID = -3993450030207130665L;
        
        public static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, 
size of default matrix block 
@@ -1869,9 +1878,21 @@ public class FrameBlock implements CacheBlock, 
Externalizable
                val = val.trim().toLowerCase().replaceAll("\"",  "");
                if (val.matches("(true|false|t|f|0|1)"))
                        return ValueType.BOOLEAN;
-               else if (val.matches("[-+]?\\d+"))
-                       return ValueType.INT64;
-               else if (val.matches("[-+]?[0-9]+\\.?[0-9]*([e]?[-+]?[0-9]+)") 
|| val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
+               else if (val.matches("[-+]?\\d+")){
+                       long maxValue = Long.parseLong(val);
+                       if ((maxValue >= Integer.MIN_VALUE) && (maxValue <= 
Integer.MAX_VALUE))
+                               return ValueType.INT32;
+                       else
+                               return ValueType.INT64;
+               }
+               else if (val.matches("[-+]?[0-9]+\\.?[0-9]*([e]?[-+]?[0-9]+)")){
+                       double maxValue = Double.parseDouble(val);
+                       if ((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= 
Float.MAX_VALUE))
+                               return ValueType.FP32;
+                       else
+                               return ValueType.FP64;
+               }
+               else if (val.equals("infinity") || val.equals("-infinity") || 
val.equals("nan"))
                        return ValueType.FP64;
                else return ValueType.STRING;
        }
@@ -1880,48 +1901,25 @@ public class FrameBlock implements CacheBlock, 
Externalizable
                int rows = this.getNumRows();
                int cols = this.getNumColumns();
                String[] schemaInfo = new String[cols];
-               int sample = (int)Math.min(Math.max(sampleFraction*rows, 1024), 
rows);
+               int sample = (int)Math.min(Math.max(sampleFraction*rows, 256), 
rows);
+
+               ExecutorService pool = CommonThreadPool.get(cols);
+               ArrayList<DetectValueTypeTask> tasks = new ArrayList<>();
                for (int i = 0; i < cols; i++) {
-                       ValueType state = ValueType.UNKNOWN;
-                       Array obj = this.getColumn(i);
-                       for (int j = 0; j < sample; j++)
-                       {
-                               String dataValue = null;
-                               //read a not null sample value
-                               while (dataValue == null) {
-                                       int randomIndex = 
ThreadLocalRandom.current().nextInt(0, rows - 1);
-                                       dataValue = ((obj.get(randomIndex) != 
null)?obj.get(randomIndex).toString().trim().replace("\"", 
"").toLowerCase():null);
-                               }
+                       FrameBlock.Array obj = this.getColumn(i);
+                       tasks.add(new DetectValueTypeTask(obj,rows, sample));
+               }
 
-                               if (isType(dataValue) == ValueType.STRING) {
-                                       state = ValueType.STRING;
-                                       break;
-                               }
-                               else if (isType(dataValue) == ValueType.FP64) {
-                                       if (dataValue.equals("infinity") || 
dataValue.equals("-infinity") || dataValue.equals("nan")) {
-                                               state = ValueType.FP64;
-                                       }
-                                       else {
-                                               double maxValue = 
Double.parseDouble(dataValue);
-                                               if ((maxValue >= 
(-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
-                                                       state = (state == 
ValueType.FP64 ? state : ValueType.FP32);
-                                               else
-                                                       state = ValueType.FP64;
-                                       }
-                               }
-                               else if (isType(dataValue) == ValueType.INT64) {
-                                       long maxValue = 
Long.parseLong(dataValue);
-                                       if ((maxValue >= Integer.MIN_VALUE) && 
(maxValue <= Integer.MAX_VALUE))
-                                               state = ((state == 
ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64) ? state 
: ValueType.INT32);
-                                       else
-                                               state = ((state == 
ValueType.FP64  || state == ValueType.FP32) ? state : ValueType.INT64);
-                               }
-                               else if (isType(dataValue) == ValueType.BOOLEAN)
-                                       state = ((new 
ArrayList<>(Arrays.asList(ValueType.FP64, ValueType.FP32, ValueType.INT64, 
ValueType.INT32)).contains(state)) ? state : ValueType.BOOLEAN);
-                               else if (isType(dataValue) == ValueType.STRING)
-                                       state = ((new 
ArrayList<>(Arrays.asList(ValueType.FP64, ValueType.FP32, ValueType.INT64, 
ValueType.INT32, ValueType.BOOLEAN)).contains(state)) ? state : 
ValueType.STRING);
+               List<Future<String>> ret;
+
+               try {
+                       ret = pool.invokeAll(tasks);
+                       pool.shutdown();
+                       for(int i = 0; i < cols; i++){
+                               schemaInfo[i] = ret.get(i).get();
                        }
-                       schemaInfo[i] = state.name();
+               } catch (ExecutionException | InterruptedException e) {
+                       throw new DMLRuntimeException("Exception Interupted or 
Exception thrown in Detect Schema", e);
                }
 
                //create output block one row representing the schema as strings
@@ -1930,12 +1928,57 @@ public class FrameBlock implements CacheBlock, 
Externalizable
                return fb;
        }
 
+       private static class DetectValueTypeTask implements Callable<String>
+       {
+               private final Array _obj;
+               private final int _rows;
+               private final int _sampleSize;
+
+
+               protected DetectValueTypeTask(Array obj, int rows, int 
sampleSize ) {
+                       _obj = obj;
+                       _rows = rows;
+                       _sampleSize = sampleSize;
+               }
+
+               @Override
+               public String call() {
+                       ValueType state = ValueType.UNKNOWN;
+                       for (int j = 0; j < _sampleSize; j++) {
+                               int randomIndex = 
ThreadLocalRandom.current().nextInt(0, _rows - 1);
+                               String dataValue = ((_obj.get(randomIndex) != 
null)?_obj.get(randomIndex).toString().trim().replace("\"", 
"").toLowerCase():null);
+                               if(dataValue != null){
+                                       ValueType current = isType(dataValue);
+                                       if (current == ValueType.STRING) {
+                                               state = ValueType.STRING;
+                                               break;
+                                       }
+                                       else if (current== ValueType.FP64) {
+                                               state = ValueType.FP64;
+                                       }
+                                       else if (current== ValueType.FP32) {
+                                               state = (state == 
ValueType.FP64 ? state : ValueType.FP32);
+                                       }
+                                       else if (current == ValueType.INT64) {
+                                               state = ((state == 
ValueType.FP64  || state == ValueType.FP32) ? state : ValueType.INT64);
+                                       }
+                                       else if (current == ValueType.INT32) {
+                                               state = ((state == 
ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64) ? state 
: ValueType.INT32);
+                                       }
+                                       else if (current == ValueType.BOOLEAN)
+                                               state = ((state == 
ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64 || state 
== ValueType.INT32) ? state : ValueType.BOOLEAN);
+                               }
+                       }
+                       return state.name();
+               }
+       }
+
        /**
         * Drop the cell value which does not confirms to the data type of its 
column
         * @param schema of the frame
         * @return original frame where invalid values are replaced with null
         */
-       public FrameBlock dropInvalid(FrameBlock schema) {
+       public FrameBlock dropInvalidType(FrameBlock schema) {
                //sanity checks
                if(this.getNumColumns() != schema.getNumColumns())
                        throw new DMLException("mismatch in number of columns 
in frame and its schema ");
@@ -1943,6 +1986,19 @@ public class FrameBlock implements CacheBlock, 
Externalizable
                String[] schemaString = schema.getStringRowIterator().next(); 
// extract the schema in String array
                for (int i = 0; i < this.getNumColumns(); i++) {
                        Array obj = this.getColumn(i);
+                       String schemaCol = schemaString[i];
+                       String type;
+                       if(schemaCol.contains("FP")){
+                               type = "FP";
+                       } else if (schemaCol.contains("INT")){
+                               type = "INT";
+                       } else if (schemaCol.contains("STRING")){
+                               // In case of String columns, don't do any 
verification or replacements.
+                               break;
+                       } else{
+                               type = schemaCol;
+                       }
+
                        for (int j = 0; j < this.getNumRows(); j++)
                        {
                                if(obj.get(j) == null)
@@ -1950,21 +2006,11 @@ public class FrameBlock implements CacheBlock, 
Externalizable
                                String dataValue = 
obj.get(j).toString().trim().replace("\"", "").toLowerCase() ;
 
                                ValueType dataType = isType(dataValue);
-                                if (dataType== ValueType.FP64 && 
schemaString[i].trim().equals("FP32")) {
-                                        double maxValue = 
Double.parseDouble(dataValue);
-                                        if ((maxValue < (-Float.MAX_VALUE)) || 
(maxValue > Float.MAX_VALUE))
-                                                this.set(j,i,null);
-                               }
-                               else if (dataType== ValueType.INT64 && 
schemaString[i].trim().equals("INT32")) {
-                                        long maxValue = 
Long.parseLong(dataValue);
-                                        if ((maxValue < Integer.MIN_VALUE) || 
(maxValue > Integer.MAX_VALUE))
-                                                this.set(j,i,null);
-                               }
-                               else if(dataType == ValueType.BOOLEAN && 
schemaString[i].trim().equals("INT32")
-                                                && 
((Integer.parseInt(dataValue) == 1 || Integer.parseInt(dataValue) == 0)))
-                                       continue;
-                               else if 
(!dataType.toString().equals(schemaString[i].trim()))
+                               if(!dataType.toString().contains(type) && 
!(dataType == ValueType.BOOLEAN && type == "INT")){
+                                       LOG.warn("Datatype detected: " + 
dataType + " where expected: " + schemaString[i] + " index: " + i + "," +j);
+
                                        this.set(j,i,null);
+                               }
                        }
                }
                return this;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
index d70b255..841c61e 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
@@ -68,45 +68,59 @@ public class FrameDropInvalidTypeTest extends 
AutomatedTestBase
 
        @Test
        public void testDoubleinStringCP() {
-               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
1, LopProperties.ExecType.CP);
+               // This test now verifies floating points are okay in string 
columns
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
1, LopProperties.ExecType.CP, true);
        }
 
        @Test
        public void testDoubleinStringSpark() {
-               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
1, LopProperties.ExecType.SPARK);
+               // This test now verifies floating points are okay in string 
columns
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
1, LopProperties.ExecType.SPARK, true);
        }
 
        @Test
        public void testStringInDouble() {
+               // This test now verifies strings are removed in float columns
                runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
2, LopProperties.ExecType.CP);
        }
 
        @Test
        public void testStringInDoubleSpark() {
+               // This test now verifies strings are removed in float columns
                runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
2, LopProperties.ExecType.SPARK);
        }
 
        @Test
        public void testDoubleInFloat() {
-               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
3, LopProperties.ExecType.CP);
+               // This test now verifies that changing from FP64 to FP32 is 
okay.
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
3, LopProperties.ExecType.CP,true);
        }
 
        @Test
        public void testDoubleInFloatSpark() {
-               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
3, LopProperties.ExecType.SPARK);
+               // This test now verifies that changing from FP64 to FP32 is 
okay.
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
3, LopProperties.ExecType.SPARK, true);
        }
 
        @Test
        public void testLongInInt() {
-               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
4, LopProperties.ExecType.CP);
+               // This test now verifies that changing from INT32 to INT64 is 
okay.
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
4, LopProperties.ExecType.CP, true);
        }
 
        @Test
        public void testLongInIntSpark() {
-               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
4, LopProperties.ExecType.SPARK);
+               // This test now verifies that changing from INT32 to INT64 is 
okay.
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
4, LopProperties.ExecType.SPARK, true);
        }
+
+       private void runIsCorrectTest(ValueType[] schema, int rows, int cols,
+                                                                 int 
badValues, int test, LopProperties.ExecType et){
+               runIsCorrectTest(schema, rows, cols, badValues, test, et, 
false);
+       }
+
        private void runIsCorrectTest(ValueType[] schema, int rows, int cols,
-               int badValues, int test, LopProperties.ExecType et)
+               int badValues, int test, LopProperties.ExecType et, boolean 
ignore)
        {
                Types.ExecMode platformOld = setExecMode(et);
                boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
@@ -182,7 +196,7 @@ public class FrameDropInvalidTypeTest extends 
AutomatedTestBase
 
                        int nullNum = Math.toIntExact(data.stream().filter(s -> 
s == null).count());
                        //verify output schema
-                       Assert.assertEquals("Wrong result: " + nullNum + ".", 
badValues, nullNum);
+                       Assert.assertEquals("Wrong result: " + nullNum + ".", 
ignore ? 0 : badValues, nullNum);
                }
                catch (Exception ex) {
                        throw new RuntimeException(ex);

Reply via email to