yunfengzhou-hub commented on code in PR #155:
URL: https://github.com/apache/flink-ml/pull/155#discussion_r973815348
##
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java:
##
@@ -20,7 +20,7 @@
Review Comment:
Let's add back the `NormalizerTest`.
##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java:
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.polynomialexpansion;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A Transformer that expands the input vectors in polynomial space.
+ *
+ * Take a 2-dimension vector as an example: `(x, y)`, if we want to expand
it with degree 2, then
+ * we get `(x, x * x, y, x * y, y * y)`.
+ *
+ * For more information about the polynomial expansion, see
+ * http://en.wikipedia.org/wiki/Polynomial_expansion.
+ */
+public class PolynomialExpansion
+implements Transformer,
+PolynomialExpansionParams {
+private final Map, Object> paramMap = new HashMap<>();
+
+public PolynomialExpansion() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl)
inputs[0]).getTableEnvironment();
+RowTypeInfo inputTypeInfo =
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+
+RowTypeInfo outputTypeInfo =
+new RowTypeInfo(
+ArrayUtils.addAll(inputTypeInfo.getFieldTypes(),
VectorTypeInfo.INSTANCE),
+ArrayUtils.addAll(inputTypeInfo.getFieldNames(),
getOutputCol()));
+
+DataStream output =
+tEnv.toDataStream(inputs[0])
+.map(
+new PolynomialExpansionFunction(getDegree(),
getInputCol()),
+outputTypeInfo);
+
+Table outputTable = tEnv.fromDataStream(output);
+return new Table[] {outputTable};
+}
+
+@Override
+public void save(String path) throws IOException {
+ReadWriteUtils.saveMetadata(this, path);
+}
+
+public static PolynomialExpansion load(StreamTableEnvironment env, String
path)
+throws IOException {
+return ReadWriteUtils.loadStageParam(path);
+}
+
+@Override
+public Map, Object> getParamMap() {
+return paramMap;
+}
+
+/** Polynomial expansion function that expands a vector in polynomial
space. */
+private static class PolynomialExpansionFunction implements
MapFunction {
Review Comment:
In Spark there is a detailed comment about how this function is achieved as
follows.
```java
/**
* The expansion is done via recursion. Given n features and degree d, the
size after expansion is
* (n + d choose d) (including 1 and first-order values). For example, let
f([a, b, c], 3) be the
* function that expands [a, b, c] to their