[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

2022-10-27 Thread GitBox


yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1007561913


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerParams.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.StringParam;
+
+/**
+ * Params of {@link Imputer}.
+ *
+ * @param  The class type of this instance.
+ */
+public interface ImputerParams extends ImputerModelParams {

Review Comment:
   Spark's `Imputer` also has a `relativeError` parameter. Should we also add 
this parameter to Flink ML?



##
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##
@@ -32,4 +34,12 @@ public DoubleParam(
 public DoubleParam(String name, String description, Double defaultValue) {
 this(name, description, defaultValue, ParamValidators.alwaysTrue());
 }
+
+@Override
+public Double jsonDecode(Object json) throws IOException {
+if (json instanceof String && json.equals(String.valueOf(Double.NaN))) 
{
+return Double.NaN;
+}
+return (Double) json;

Review Comment:
   How about the following implementation?
   ```java
   if (json instanceof String) {
   return Double.valueOf((String) json);
   }
   return (Double) json;
   ```
   This applies to `Double.NaN` as well as other special values.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Imputer estimator completes missing values in a dataset. Missing values 
can be imputed using

Review Comment:
   nit: it might be better to use "bounded stream" instead of "dataset", as 
dataset has a specific meaning in flink.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerModel.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2

[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

2022-10-31 Thread GitBox


yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1009085375


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The imputer for completing missing values of the input columns.
+ *
+ * Missing values can be imputed using the statistics(mean, median or most 
frequent) of each
+ * column in which the missing values are located. The input columns should be 
of numeric type.

Review Comment:
   Could you please add test cases to verify that this algorithm can work on 
numeric values other than doubles, like integers or floats?



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerModel.java:
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A Model which replace the missing values using the model data computed by 
{@link Imputer}. */
+public class ImputerModel implements Model, 
ImputerModelParams {
+
+private final Map, Object> paramMap = new HashMap<>();
+private Table modelDataTable;
+
+public ImputerModel() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public ImputerModel setModelData(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+modelDataTable = inputs[0];
+return this;
+}
+
+@Override
+public Tab

[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

2022-11-02 Thread GitBox


yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1012433599


##
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java:
##
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.imputer.Imputer;
+import org.apache.flink.ml.feature.imputer.ImputerModel;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.ml.feature.imputer.ImputerParams.MEAN;
+import static org.apache.flink.ml.feature.imputer.ImputerParams.MEDIAN;
+import static org.apache.flink.ml.feature.imputer.ImputerParams.MOST_FREQUENT;
+import static 
org.apache.flink.test.util.TestBaseUtils.compareResultCollections;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link Imputer} and {@link ImputerModel}. */
+public class ImputerTest {

Review Comment:
   Let's make this class `extends AbstractTestBase`. This could help start one 
mini-cluster and reuse it across all test cases, thus improving test efficiency.



##
docs/content/docs/operators/feature/imputer.md:
##
@@ -0,0 +1,190 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+
+
+## Imputer
+The imputer for completing missing values of the input columns.
+Missing values can be imputed using the statistics(mean, median or most 
frequent) of each column in which the missing values are located. The input 
columns should be of numeric type.

Review Comment:
   Let's add a blank line above this, or markdown might treat these two 
paragraphs as one.



##
docs/content/docs/operators/feature/imputer.md:
##
@@ -0,0 +1,190 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+
+
+## Imputer
+The imputer for completing missing values of the input columns.
+Missing values can be imputed using the statistics(mean, median or most 
frequent) of each column in which the missing values are located. The input 
columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after filtering 
out missing values and null values, null values are always treated as missing, 
and so are also imputed.

Review Comment:
   Let's reformat the file so that there is no more than 80 characters per line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

2022-11-03 Thread GitBox


yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1013518990


##
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##
@@ -32,4 +34,12 @@ public DoubleParam(
 public DoubleParam(String name, String description, Double defaultValue) {
 this(name, description, defaultValue, ParamValidators.alwaysTrue());
 }
+
+@Override
+public Double jsonDecode(Object json) throws IOException {
+if (json instanceof String && json.equals(String.valueOf(Double.NaN))) 
{
+return Double.NaN;
+}
+return (Double) json;

Review Comment:
   Other special values include `Double.POSITIVE_INFINITY/NEGATIVE_INFINITY`. 
These special values might be used in algorithms like Bucketizer.
   
   I agree with it that we should also update `FloatParam`, as well as 
`DoubleArrayParam`, `FloatArrayParam` and `DoubleArrayArrayParam`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

2022-11-03 Thread GitBox


yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1013518990


##
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##
@@ -32,4 +34,12 @@ public DoubleParam(
 public DoubleParam(String name, String description, Double defaultValue) {
 this(name, description, defaultValue, ParamValidators.alwaysTrue());
 }
+
+@Override
+public Double jsonDecode(Object json) throws IOException {
+if (json instanceof String && json.equals(String.valueOf(Double.NaN))) 
{
+return Double.NaN;
+}
+return (Double) json;

Review Comment:
   Other special values include `Double.POSITIVE_INFINITY/NEGATIVE_INFINITY`. 
These special values might also be the invalid values that need to be replaced 
with Imputer.
   
   I agree with it that we should also update `FloatParam`. By the way, 
algorithms like Bucketizer have been using these special values in their 
parameters, and Flink ML's implementation of  `DoubleArrayParam` has supported 
such special values, so it might be enough just to update `DoubleParam` and 
`FloatParam`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org