mboehm7 commented on a change in pull request #923:
URL: https://github.com/apache/systemml/pull/923#discussion_r433446389



##########
File path: src/main/java/org/apache/sysds/common/Builtins.java
##########
@@ -86,6 +86,7 @@
        DIAG("diag", false),
        DISCOVER_FD("discoverFD", true),
        DROP_INVALID("dropInvalid", false),
+       DROP_INVALID_LENGTH("dropInvalidLen", false),

Review comment:
       Please, make it consistent: `dropInvalidType`, and `dropInvalidLength` 
with consistent semantics, both drop the entries. Initially, I thought it would 
be better to return a 0/1 matrix indicating violations (or even better just 
return the string length per cell), but right now we could not eliminate them 
via other operations and thus should directly drop them (before feeding the 
frames into a tranformencode)

##########
File path: src/main/java/org/apache/sysds/hops/BinaryOp.java
##########
@@ -895,7 +895,8 @@ public static boolean requiresReplication( Hop left, Hop 
right )
        private static MMBinaryMethod optFindMMBinaryMethodSpark(Hop left, Hop 
right) {
                // TODO size information for tensor
                if ((left._dataType == DataType.TENSOR && right._dataType == 
DataType.TENSOR)
-                       || (left._dataType == DataType.FRAME && right._dataType 
== DataType.FRAME))
+                       || (left._dataType == DataType.FRAME && right._dataType 
== DataType.FRAME)
+                       || (left._dataType == DataType.FRAME && right._dataType 
== DataType.MATRIX))
                        return MMBinaryMethod.MR_BINARY_R;

Review comment:
       For now, I would recommend to always broadcast the vector - so it's a 
binary_M

##########
File path: src/main/java/org/apache/sysds/common/Types.java
##########
@@ -264,7 +264,7 @@ public static OpOp1 valueOfByOpcode(String opcode) {
        public enum OpOp2 {
                AND(true), BITWAND(true), BITWOR(true), BITWSHIFTL(true), 
BITWSHIFTR(true),
                BITWXOR(true), CBIND(false), CONCAT(false), COV(false), 
DIV(true),
-               DROP_INVALID(false), EQUAL(true), GREATER(true), 
GREATEREQUAL(true),
+               DROP_INVALID(false), DROP_INVALID_LENGTH(false), EQUAL(true), 
GREATER(true), GREATEREQUAL(true),

Review comment:
       globally use the names above.

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
##########
@@ -52,8 +53,10 @@ else if (in1.getDataType() == DataType.MATRIX && 
in2.getDataType() == DataType.M
                        return new BinaryMatrixMatrixCPInstruction(operator, 
in1, in2, out, opcode, str);
                else if (in1.getDataType() == DataType.TENSOR && 
in2.getDataType() == DataType.TENSOR)
                        return new BinaryTensorTensorCPInstruction(operator, 
in1, in2, out, opcode, str);
-               else if (in1.getDataType() == DataType.FRAME && 
in2.getDataType() == DataType.FRAME)
+               else if (in1.getDataType() == DataType.FRAME && 
(in2.getDataType() == DataType.FRAME))

Review comment:
       There is no need for these extra parentheses.

##########
File path: src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
##########
@@ -1529,7 +1529,16 @@ else if(this.getOpCode() == Builtins.MAX_POOL || 
this.getOpCode() == Builtins.AV
                        output.setValueType(ValueType.STRING);
                        break;
 
-               default:
+               case DROP_INVALID_LENGTH:
+                       checkNumParameters(2);
+                       checkMatrixFrameParam(getFirstExpr());
+                       checkMatrixFrameParam(getSecondExpr());
+                       output.setDataType(DataType.FRAME);
+                       output.setDimensions(id.getDim1(), id.getDim2());
+                       output.setBlocksize (id.getBlocksize());
+                       output.setValueType(ValueType.BOOLEAN);

Review comment:
       The value type now should be taken from the input.

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameMatrixSPInstruction.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameMatrixSPInstruction extends BinarySPInstruction {
+       protected BinaryFrameMatrixSPInstruction(Operator op, CPOperand in1, 
CPOperand in2, CPOperand out, String opcode, String istr) {
+               super(SPType.Binary, op, in1, in2, out, opcode, istr);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               SparkExecutionContext sec = (SparkExecutionContext) ec;
+               // Get input RDDs
+               JavaPairRDD<Long, FrameBlock> in1 = 
sec.getFrameBinaryBlockRDDHandleForVariable(input1.getName());
+               // get feature length matrix
+               Broadcast<MatrixBlock> feaLen =  
sec.getSparkContext().broadcast(sec.getMatrixInput(input2.getName()));

Review comment:
       Please use our PartitionedBroadcast as all other broadcast-based 
operations (e.g., mapmm).

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameMatrixSPInstruction.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameMatrixSPInstruction extends BinarySPInstruction {
+       protected BinaryFrameMatrixSPInstruction(Operator op, CPOperand in1, 
CPOperand in2, CPOperand out, String opcode, String istr) {
+               super(SPType.Binary, op, in1, in2, out, opcode, istr);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               SparkExecutionContext sec = (SparkExecutionContext) ec;
+               // Get input RDDs
+               JavaPairRDD<Long, FrameBlock> in1 = 
sec.getFrameBinaryBlockRDDHandleForVariable(input1.getName());
+               // get feature length matrix
+               Broadcast<MatrixBlock> feaLen =  
sec.getSparkContext().broadcast(sec.getMatrixInput(input2.getName()));
+               JavaPairRDD<Long, FrameBlock> out = in1.mapValues(new 
DropInvalidLengths(feaLen.getValue()));
+               //release input matrix

Review comment:
       calling `feaLen.getValue` at the driver defeats the purpose of a 
broadcast variable, because you obtain the wrapped data object and directly 
serialize it via the task closure. 

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameMatrixSPInstruction.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameMatrixSPInstruction extends BinarySPInstruction {
+       protected BinaryFrameMatrixSPInstruction(Operator op, CPOperand in1, 
CPOperand in2, CPOperand out, String opcode, String istr) {
+               super(SPType.Binary, op, in1, in2, out, opcode, istr);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               SparkExecutionContext sec = (SparkExecutionContext) ec;
+               // Get input RDDs
+               JavaPairRDD<Long, FrameBlock> in1 = 
sec.getFrameBinaryBlockRDDHandleForVariable(input1.getName());
+               // get feature length matrix
+               Broadcast<MatrixBlock> feaLen =  
sec.getSparkContext().broadcast(sec.getMatrixInput(input2.getName()));
+               JavaPairRDD<Long, FrameBlock> out = in1.mapValues(new 
DropInvalidLengths(feaLen.getValue()));
+               //release input matrix
+               sec.releaseMatrixInput(input2.getName());
+               //set output RDD
+               sec.setRDDHandleForVariable(output.getName(), out);
+               sec.addLineageRDD(output.getName(), input1.getName());
+       }
+
+       private static class DropInvalidLengths implements  
Function<FrameBlock,FrameBlock> {
+               private static final long serialVersionUID = 
5850400295183766400L;
+
+               private MatrixBlock featureLength = null;
+
+               public DropInvalidLengths(MatrixBlock fl) {
+                       featureLength = fl;
+               }
+
+               @Override public FrameBlock call(FrameBlock frameBlock) throws 
Exception {
+                       FrameBlock fb = 
frameBlock.invalidByLength(featureLength);

Review comment:
       here you should call the broadcast variable getValue.

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameMatrixSPInstruction.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameMatrixSPInstruction extends BinarySPInstruction {
+       protected BinaryFrameMatrixSPInstruction(Operator op, CPOperand in1, 
CPOperand in2, CPOperand out, String opcode, String istr) {
+               super(SPType.Binary, op, in1, in2, out, opcode, istr);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               SparkExecutionContext sec = (SparkExecutionContext) ec;
+               // Get input RDDs
+               JavaPairRDD<Long, FrameBlock> in1 = 
sec.getFrameBinaryBlockRDDHandleForVariable(input1.getName());
+               // get feature length matrix
+               Broadcast<MatrixBlock> feaLen =  
sec.getSparkContext().broadcast(sec.getMatrixInput(input2.getName()));

Review comment:
       Unless this creates issues as you need entire rows not 1kx1k blocks.

##########
File path: 
src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.frame;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixValue;
+import org.apache.sysds.runtime.matrix.data.OutputInfo;
+import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+public class FrameDropInvalidLengthTest extends AutomatedTestBase {
+       private final static String TEST_NAME = "DropInvalidLength";
+       private final static String TEST_DIR = "functions/frame/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FrameDropInvalidLengthTest.class.getSimpleName() + "/";
+
+       private final static int rows = 800;
+       private final static int cols = 4;
+       private final static Types.ValueType[] schemaStrings = 
{Types.ValueType.FP64, Types.ValueType.STRING, Types.ValueType.STRING, 
Types.ValueType.INT64};
+
+       public static void init() {
+               TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+       }
+
+       public static void cleanUp() {
+               if (TEST_CACHE_ENABLED) {
+                       TestUtils.clearDirectory(TEST_DATA_DIR + 
TEST_CLASS_DIR);
+               }
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+               if (TEST_CACHE_ENABLED) {
+                       setOutAndExpectedDeletionDisabled(true);
+               }
+       }
+
+       @Test
+       public void testTwoBadColCP() {
+               double[][] invalidLength =  {{-1,30,20,-1}};
+               runDropInvalidLenTest( invalidLength,1, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testTwoBadColSP() {

Review comment:
       For me all the spark tests pass without problems, except for 
`testNoneBadColSP` because the hashmap read from the output is empty - so the 
call to next fails. Please, try to simply make the test more robust.

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
##########
@@ -1865,6 +1865,39 @@ else if 
(!dataType.toString().equals(schemaString[i].trim()))
                return this;
        }
 
+       /*
+               This method validate the frame data against a column length 
constrain
+               if data value in any column is greater than specific threshold
+               output vector will store a 1 for that column position.
+               @param input MatrixBlock of valid lengths
+               @param output a boolean FrameBlock for valid/invalid features
+        */
+       public FrameBlock invalidByLength(MatrixBlock feaLen) {
+               //sanity checks
+               if(this.getNumColumns() != feaLen.getNumColumns())
+                       throw new DMLException("mismatch in number of columns 
in frame and corresponding feature-length vector");
+
+               ValueType[] outSchema = 
UtilFunctions.nCopies(this.getNumColumns(), ValueType.BOOLEAN);

Review comment:
       Now you can simply take over the input schema (maybe a deep copy of it)

##########
File path: 
src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.frame;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixValue;
+import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+public class FrameDropInvalidLengthTest extends AutomatedTestBase {
+       private final static String TEST_NAME = "DropInvalidLength";
+       private final static String TEST_DIR = "functions/frame/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FrameDropInvalidLengthTest.class.getSimpleName() + "/";
+
+       private final static int rows = 800;
+       private final static int cols = 4;
+       private final static Types.ValueType[] schemaStrings = 
{Types.ValueType.FP64, Types.ValueType.STRING, Types.ValueType.STRING, 
Types.ValueType.INT64};
+
+       public static void init() {
+               TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+       }
+
+       public static void cleanUp() {
+               if (TEST_CACHE_ENABLED) {
+                       TestUtils.clearDirectory(TEST_DATA_DIR + 
TEST_CLASS_DIR);
+               }
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+               if (TEST_CACHE_ENABLED) {
+                       setOutAndExpectedDeletionDisabled(true);
+               }
+       }
+
+       @Test
+       public void testTwoBadColCP() {
+               double[][] invalidLength =  {{-1,30,20,-1}};
+               runDropInvalidLenTest( invalidLength,1, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testTwoBadColSP() {
+//             double[][] invalidLength =  {{-1,30,20,-1}};
+//             runDropInvalidLenTest( invalidLength,1, 
LopProperties.ExecType.SPARK);
+//     }
+
+       @Test
+       public void testOneBadColCP() {
+               double[][] invalidLength =  {{-1,-1,20,-1}};
+               runDropInvalidLenTest( invalidLength,2, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testOneBadColSP() {
+//             double[][] invalidLength =  {{-1,-1,20,-1}};
+//             runDropInvalidLenTest( invalidLength,2, 
LopProperties.ExecType.SPARK);
+//     }
+
+       @Test
+       public void testAllBadColCP() {
+               double[][] invalidLength =  {{2,2,2,1}};
+               runDropInvalidLenTest( invalidLength,3, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testAllBadColSP() {
+//             double[][] invalidLength =  {{2,2,2,1}};
+//             runDropInvalidLenTest( invalidLength,3, 
LopProperties.ExecType.SPARK);
+//     }
+
+       @Test
+       public void testNoneBadColCP() {
+               double[][] invalidLength =  {{-1,20,20,-1}};
+               runDropInvalidLenTest( invalidLength,4, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testNoneBadColSP() {
+//             double[][] invalidLength =  {{-1,20,20,-1}};
+//             runDropInvalidLenTest( invalidLength,4, 
LopProperties.ExecType.SPARK);
+//     }
+
+       private void runDropInvalidLenTest(double[][] colInvalidLength, int 
test, LopProperties.ExecType et)
+       {
+               Types.ExecMode platformOld = setExecMode(et);
+               boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               try {
+                       getAndLoadTestConfiguration(TEST_NAME);
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[] {"-args", input("A"), 
input("M"),
+                                       String.valueOf(rows), 
Integer.toString(cols), output("B")};
+                       FrameBlock frame1 = new FrameBlock(schemaStrings);
+                       double[][] A = getRandomMatrix(rows, cols, 10, 100, 1, 
2373);
+                       initFrameDataString(frame1,A, schemaStrings); // 
initialize a frame with one column
+                       FrameWriter writer = 
FrameWriterFactory.createFrameWriter(Types.FileFormat.CSV);
+
+                       ArrayList<Integer> badIndex = getBadIndexes(rows/4);
+                       int expected = 0;
+
+                       switch (test) { //Double in String
+                               case 1:
+                                       for (int i = 0; i < badIndex.size(); 
i++) {
+                                               
frame1.set(badIndex.get(i),1,"This is a very long sentence that could" +
+                                                               " count up to 
multiple characters");
+                                       }
+                                       expected += badIndex.size();
+                               case 2:
+                                       for (int i = 0; i < badIndex.size(); 
i++) {
+                                               frame1.set(badIndex.get(i), 2, 
"This is out of length");
+                                       }
+                                       expected += badIndex.size();
+                                       break;
+                               case 3:
+                                       expected += rows*cols;
+                                       break;
+                               case 4:
+                                       expected += 0;
+                                       break;
+                       }
+                       // write data frame
+                       writer.writeFrameToHDFS(
+                                       frame1.slice(0, rows - 1, 0, cols-1, 
new FrameBlock()),
+                                       input("A"), rows, schemaStrings.length);
+                       // write expected feature length matrix
+                       writeInputMatrixWithMTD("M", colInvalidLength, true);
+
+                       runTest(true, false, null, -1);
+                       // compare output
+                       HashMap<MatrixValue.CellIndex, Double> dmlOut = 
readDMLMatrixFromHDFS("B");
+                                               MatrixValue.CellIndex index = 
dmlOut.keySet().iterator().next(); double d = dmlOut.get(index);
+                                               Assert.assertEquals(expected, 
d, 1e-5);
+               }
+               catch (Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlag;
+                       OptimizerUtils.ALLOW_AUTO_VECTORIZATION = true;
+                       OptimizerUtils.ALLOW_OPERATOR_FUSION = true;
+               }
+       }
+
+       private ArrayList<Integer> getBadIndexes(int length) {
+               ArrayList<Integer> list = new ArrayList<Integer>();

Review comment:
       In general, you want to use something like this (typed without 
unnecessary redundancy): `ArrayList<Integer> list = new ArrayList<>();`

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
##########
@@ -1865,6 +1865,39 @@ else if 
(!dataType.toString().equals(schemaString[i].trim()))
                return this;
        }
 
+       /*
+               This method validate the frame data against a column length 
constrain
+               if data value in any column is greater than specific threshold
+               output vector will store a 1 for that column position.
+               @param input MatrixBlock of valid lengths
+               @param output a boolean FrameBlock for valid/invalid features
+        */
+       public FrameBlock invalidByLength(MatrixBlock feaLen) {
+               //sanity checks
+               if(this.getNumColumns() != feaLen.getNumColumns())
+                       throw new DMLException("mismatch in number of columns 
in frame and corresponding feature-length vector");
+
+               ValueType[] outSchema = 
UtilFunctions.nCopies(this.getNumColumns(), ValueType.BOOLEAN);
+               String[][] outData = new 
String[this.getNumRows()][this.getNumColumns()];
+               FrameBlock outBlock = new FrameBlock(outSchema, outData);
+               outer: for (int i = 0; i < this.getNumColumns(); i++) {

Review comment:
       the explicit labels are unnecessary here because the referenced continue 
is the default behavior.

##########
File path: src/test/scripts/functions/frame/DropInvalidLength.dml
##########
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1, rows=$3, cols=$4, data_type="frame", format="csv");
+colLength = read($2); # column length vector -1 for exempted features and 
+                      # a valid characater length for features to be processed 

Review comment:
       typo 'characater' -> 'character'

##########
File path: 
src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidLengthTest.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.frame;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixValue;
+import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+public class FrameDropInvalidLengthTest extends AutomatedTestBase {
+       private final static String TEST_NAME = "DropInvalidLength";
+       private final static String TEST_DIR = "functions/frame/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FrameDropInvalidLengthTest.class.getSimpleName() + "/";
+
+       private final static int rows = 800;
+       private final static int cols = 4;
+       private final static Types.ValueType[] schemaStrings = 
{Types.ValueType.FP64, Types.ValueType.STRING, Types.ValueType.STRING, 
Types.ValueType.INT64};
+
+       public static void init() {
+               TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+       }
+
+       public static void cleanUp() {
+               if (TEST_CACHE_ENABLED) {
+                       TestUtils.clearDirectory(TEST_DATA_DIR + 
TEST_CLASS_DIR);
+               }
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+               if (TEST_CACHE_ENABLED) {
+                       setOutAndExpectedDeletionDisabled(true);
+               }
+       }
+
+       @Test
+       public void testTwoBadColCP() {
+               double[][] invalidLength =  {{-1,30,20,-1}};
+               runDropInvalidLenTest( invalidLength,1, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testTwoBadColSP() {
+//             double[][] invalidLength =  {{-1,30,20,-1}};
+//             runDropInvalidLenTest( invalidLength,1, 
LopProperties.ExecType.SPARK);
+//     }
+
+       @Test
+       public void testOneBadColCP() {
+               double[][] invalidLength =  {{-1,-1,20,-1}};
+               runDropInvalidLenTest( invalidLength,2, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testOneBadColSP() {
+//             double[][] invalidLength =  {{-1,-1,20,-1}};
+//             runDropInvalidLenTest( invalidLength,2, 
LopProperties.ExecType.SPARK);
+//     }
+
+       @Test
+       public void testAllBadColCP() {
+               double[][] invalidLength =  {{2,2,2,1}};
+               runDropInvalidLenTest( invalidLength,3, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testAllBadColSP() {
+//             double[][] invalidLength =  {{2,2,2,1}};
+//             runDropInvalidLenTest( invalidLength,3, 
LopProperties.ExecType.SPARK);
+//     }
+
+       @Test
+       public void testNoneBadColCP() {
+               double[][] invalidLength =  {{-1,20,20,-1}};
+               runDropInvalidLenTest( invalidLength,4, 
LopProperties.ExecType.CP);
+       }
+
+//     @Test
+//     public void testNoneBadColSP() {
+//             double[][] invalidLength =  {{-1,20,20,-1}};
+//             runDropInvalidLenTest( invalidLength,4, 
LopProperties.ExecType.SPARK);
+//     }
+
+       private void runDropInvalidLenTest(double[][] colInvalidLength, int 
test, LopProperties.ExecType et)
+       {
+               Types.ExecMode platformOld = setExecMode(et);
+               boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               try {
+                       getAndLoadTestConfiguration(TEST_NAME);
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[] {"-args", input("A"), 
input("M"),
+                                       String.valueOf(rows), 
Integer.toString(cols), output("B")};
+                       FrameBlock frame1 = new FrameBlock(schemaStrings);
+                       double[][] A = getRandomMatrix(rows, cols, 10, 100, 1, 
2373);
+                       initFrameDataString(frame1,A, schemaStrings); // 
initialize a frame with one column
+                       FrameWriter writer = 
FrameWriterFactory.createFrameWriter(Types.FileFormat.CSV);
+
+                       ArrayList<Integer> badIndex = getBadIndexes(rows/4);
+                       int expected = 0;
+
+                       switch (test) { //Double in String
+                               case 1:
+                                       for (int i = 0; i < badIndex.size(); 
i++) {
+                                               
frame1.set(badIndex.get(i),1,"This is a very long sentence that could" +
+                                                               " count up to 
multiple characters");
+                                       }
+                                       expected += badIndex.size();
+                               case 2:
+                                       for (int i = 0; i < badIndex.size(); 
i++) {
+                                               frame1.set(badIndex.get(i), 2, 
"This is out of length");
+                                       }
+                                       expected += badIndex.size();
+                                       break;
+                               case 3:
+                                       expected += rows*cols;
+                                       break;
+                               case 4:
+                                       expected += 0;
+                                       break;
+                       }
+                       // write data frame
+                       writer.writeFrameToHDFS(
+                                       frame1.slice(0, rows - 1, 0, cols-1, 
new FrameBlock()),
+                                       input("A"), rows, schemaStrings.length);
+                       // write expected feature length matrix
+                       writeInputMatrixWithMTD("M", colInvalidLength, true);
+
+                       runTest(true, false, null, -1);
+                       // compare output
+                       HashMap<MatrixValue.CellIndex, Double> dmlOut = 
readDMLMatrixFromHDFS("B");
+                                               MatrixValue.CellIndex index = 
dmlOut.keySet().iterator().next(); double d = dmlOut.get(index);
+                                               Assert.assertEquals(expected, 
d, 1e-5);

Review comment:
       Formatting issues and multiple statements on one line.

##########
File path: src/test/scripts/functions/frame/DropInvalidLength.dml
##########
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1, rows=$3, cols=$4, data_type="frame", format="csv");
+colLength = read($2); # column length vector -1 for exempted features and 

Review comment:
       it's supposed to be a row vector - otherwise it might explain why the 
spark operations crash.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to