yunfengzhou-hub commented on code in PR #156:
URL: https://github.com/apache/flink-ml/pull/156#discussion_r990872536


##########
docs/content/docs/operators/feature/vectorassembler.md:
##########
@@ -156,12 +160,13 @@ input_data_table = t_env.from_data_stream(
             [DenseVectorTypeInfo(), Types.DOUBLE(), SparseVectorTypeInfo()])))
 
 # create a vector assembler object and initialize its parameters
-vector_assembler = VectorAssembler() \
-    .set_input_cols('vec', 'num', 'sparse_vec') \
-    .set_output_col('assembled_vec') \
+vector_assembler = VectorAssembler()
+    .set_input_cols('vec', 'num', 'sparse_vec')
+    .set_output_col('assembled_vec')
+    .set_input_sizes(2, 1, 5)

Review Comment:
   The example code in the document is not the same as that in 
`flink-ml-python/pyflink/examples` folder. For example, this line should be 
`.set_input_sizes(2, 1, 5) \` instead of `.set_input_sizes(2, 1, 5)`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasVectorSize.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared vector size. */
+public interface HasVectorSize<T> extends WithParams<T> {

Review Comment:
   nit: This parameter is not used and can be deleted.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -47,7 +47,9 @@
 
 /**
  * A Transformer which combines a given list of input columns into a vector 
column. Types of input
- * columns must be either vector or numerical value.
+ * columns must be either vector or numerical types. The elements assembled in 
the same column must
+ * have the same size. If the size of the element is not equal to 
sizes[columnIdx], it will throw an
+ * IllegalArgumentException.

Review Comment:
   It is determined by `HasHandleInvalid` whether to throw exceptions. Let's 
improve the JavaDoc to describe this behavior.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -133,6 +145,17 @@ public void flatMap(Row value, Collector<Row> out) {
                             : assembleSparse(inputCols, value, vectorSize, 
nnz);
             out.collect(Row.join(value, Row.of(assembledVec)));
         }
+
+        private void sizeHint(int expectedSize, int currentSize) {
+            if (currentSize != expectedSize) {
+                throw new IllegalArgumentException(
+                        "Inconsistent vector size, setSize is "

Review Comment:
   How about the following error message?
   ```java
   throw new IllegalArgumentException(
           String.format(
                   "Input vector/number size does not meet with expected. 
Expected size: %d, actual size: %s",
                   expectedSize, currentSize));
   ```



##########
docs/content/docs/operators/feature/vectorassembler.md:
##########
@@ -44,11 +46,12 @@ Types of input columns must be either vector or numerical 
value.
 
 ### Parameters
 
-| Key           | Default    | Type     | Required | Description               
                                                     |
-|---------------|------------|----------|----------|--------------------------------------------------------------------------------|
-| inputCols     | `null`     | String[] | yes      | Input column names.       
                                                     |
-| outputCol     | `"output"` | String   | no       | Output column name.       
                                                     |
-| handleInvalid | `"error"`  | String   | no       | Strategy to handle 
invalid entries. Supported values: 'error', 'skip', 'keep'. |
+| Key             | Default    | Type      | Required | Description            
                                                        |
+|-----------------|------------|-----------|----------|--------------------------------------------------------------------------------|
+| inputCols       | `null`     | String[]  | yes      | Input column names.    
                                                        |
+| outputCol       | `"output"` | String    | no       | Output column name.    
                                                        |
+| inputSizes      | `null`     | Integer[] | yes      | Sizes of the 
assembling elements.                                              |

Review Comment:
   The type of this parameter should be `int[]` instead of `Integer[]`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssemblerParams.java:
##########
@@ -21,11 +21,31 @@
 import org.apache.flink.ml.common.param.HasHandleInvalid;
 import org.apache.flink.ml.common.param.HasInputCols;
 import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+import org.apache.commons.lang3.ArrayUtils;
 
 /**
  * Params of {@link VectorAssembler}.
  *
  * @param <T> The class type of this instance.
  */
 public interface VectorAssemblerParams<T>
-        extends HasInputCols<T>, HasOutputCol<T>, HasHandleInvalid<T> {}
+        extends HasInputCols<T>, HasOutputCol<T>, HasHandleInvalid<T> {
+    Param<Integer[]> INPUT_SIZES =
+            new IntArrayParam(
+                    "inputSizes",
+                    "Sizes of the input elements to be assembled.",
+                    null,
+                    ParamValidators.notNull());
+
+    default int[] getInputSizes() {
+        return ArrayUtils.toPrimitive(get(INPUT_SIZES));
+    }
+
+    default T setInputSizes(Integer... value) {

Review Comment:
   nit: `default T setInputSizes(int... value)`



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -133,6 +145,17 @@ public void flatMap(Row value, Collector<Row> out) {
                             : assembleSparse(inputCols, value, vectorSize, 
nnz);
             out.collect(Row.join(value, Row.of(assembledVec)));
         }
+
+        private void sizeHint(int expectedSize, int currentSize) {

Review Comment:
   How about renaming this method to `checkVectorAndNumberSize()`? The current 
name is inherited from Spark's `VectorSizeHint` operator, so users may find 
this name hard to understand if they have not known Spark before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to