yunfengzhou-hub commented on code in PR #156:
URL: https://github.com/apache/flink-ml/pull/156#discussion_r978291706


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssemblerParams.java:
##########
@@ -21,11 +21,31 @@
 import org.apache.flink.ml.common.param.HasHandleInvalid;
 import org.apache.flink.ml.common.param.HasInputCols;
 import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+import java.util.Arrays;
 
 /**
  * Params of {@link VectorAssembler}.
  *
  * @param <T> The class type of this instance.
  */
 public interface VectorAssemblerParams<T>
-        extends HasInputCols<T>, HasOutputCol<T>, HasHandleInvalid<T> {}
+        extends HasInputCols<T>, HasOutputCol<T>, HasHandleInvalid<T> {
+    Param<Integer[]> SIZES =

Review Comment:
   In referent to the `splitsArray` parameter in Bucketizer and 
`VectorSizeHint` in Spark VectorAssembler, do you think it would be better to 
rename this parameter to `vectorSizeArray` or `elementSizeArray`?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -74,38 +74,65 @@ public Table[] transform(Table... inputs) {
         DataStream<Row> output =
                 tEnv.toDataStream(inputs[0])
                         .flatMap(
-                                new AssemblerFunc(getInputCols(), 
getHandleInvalid()),
+                                new AssemblerFunction(
+                                        getInputCols(), getHandleInvalid(), 
getSizes()),
                                 outputTypeInfo);
         Table outputTable = tEnv.fromDataStream(output);
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunction implements FlatMapFunction<Row, 
Row> {
         private final String[] inputCols;
         private final String handleInvalid;
+        private final int[] sizeArray;
 
-        public AssemblerFunc(String[] inputCols, String handleInvalid) {
+        public AssemblerFunction(String[] inputCols, String handleInvalid, 
int[] sizeArray) {
             this.inputCols = inputCols;
             this.handleInvalid = handleInvalid;
+            this.sizeArray = sizeArray;
         }
 
         @Override
         public void flatMap(Row value, Collector<Row> out) {
             int nnz = 0;
             int vectorSize = 0;
             try {
-                for (String inputCol : inputCols) {
+                for (int i = 0; i < inputCols.length; ++i) {
+                    String inputCol = inputCols[i];
                     Object object = value.getField(inputCol);
                     Preconditions.checkNotNull(object, "Input column value 
should not be null.");
                     if (object instanceof Number) {
+                        Preconditions.checkArgument(

Review Comment:
   There might be performance issues if we perform these checks for each 
record. Can we try to avoid this? For example, can the assembling process fail 
if the input data size does match with expected, so that no check is needed?



##########
flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/VectorAssemblerExample.java:
##########
@@ -56,7 +56,8 @@ public static void main(String[] args) {
         VectorAssembler vectorAssembler =
                 new VectorAssembler()
                         .setInputCols("vec", "num", "sparseVec")
-                        .setOutputCol("assembledVec");
+                        .setOutputCol("assembledVec")
+                        .setSizes(2, 1, 5);

Review Comment:
   Let's update VectorAssembler's markdown document accordingly, including its 
parameter list, examples and possibly descriptions.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssemblerParams.java:
##########
@@ -21,11 +21,31 @@
 import org.apache.flink.ml.common.param.HasHandleInvalid;
 import org.apache.flink.ml.common.param.HasInputCols;
 import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+import java.util.Arrays;
 
 /**

Review Comment:
   Would it be better to add some document describing how users need to tackle 
the sizes parameter? For example, Spark has some descriptions of the function 
of VectorSizeHint in the JavaDoc of `VectorAssembler.handleInvalid`. Maybe we 
can add similar descriptions to the JavaDoc of `VectorAssembler`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to