[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-08-01 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r934409257


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Chi-square test algorithm.
+ *
+ * Chi-square Test is an AlgoOperator that computes the statistics of 
independence of variables

Review Comment:
   Thanks for your comments. This statistical result yields three measures: 
chi-square statistic, p-value, and DOF.How about update the java doc as:
   
   Chi-square Test computes the statistics of independence of variables in a 
contingency table, e.g., chi-square statistic, p-value, and DOF(number of 
degrees of freedom) for each input feature. The contingency table is 
constructed from the observed categorical values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-29 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r933061036


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Chi-square test algorithm.
+ *
+ * Chi-square test of independence of variables in a contingency table. 
This function computes
+ * the chi-square statistic and p-value and dof(number of degrees of freedom) 
for every feature in
+ * the contingency table. The contingency table is constructed from the 
observed of categorical
+ * values.

Review Comment:
   Thanks for this document,I will update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-29 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r933059024


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Chi-square test algorithm.
+ *
+ * Chi-square test of independence of variables in a contingency table. 
This function computes
+ * the chi-square statistic and p-value and dof(number of degrees of freedom) 
for every feature in
+ * the contingency table. The contingency table is constructed from the 
observed of categorical
+ * values.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies

Review Comment:
   Thanks for the comments.
   
   Part of the context in the docs is referenced from Python,I'm going to 
reformat it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above 

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-29 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r933040819


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies
+DataStream> observedFreq =
+DataStreamUtils.mapPartition(
+colAndFeatureAndLabel.keyBy(Tuple3::hashCode),
+new GenerateObservedFrequencies());
+
+SingleOutputStreamOperator> 
filledObservedFreq =

Review Comment:
   I think this is hard to avoid `parallellism=1`, because the cardinality of 
label is unpredictable,it will eventually aggregate on a node to distinguish 
the label.However, we can try to reduce the amount of data on the `Map`side.



-- 

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930911825


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies
+DataStream> observedFreq =
+DataStreamUtils.mapPartition(
+colAndFeatureAndLabel.keyBy(Tuple3::hashCode),
+new GenerateObservedFrequencies());
+
+SingleOutputStreamOperator> 
filledObservedFreq =
+observedFreq
+.transform(
+"filledObservedFreq",
+Types.TUPLE(
+Types.STRING,
+  

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930906258


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();

Review Comment:
   > Could we check the size of the `inputs` here?
   
   I added a method to check the size of the inputs
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930920964


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies
+DataStream> observedFreq =
+DataStreamUtils.mapPartition(
+colAndFeatureAndLabel.keyBy(Tuple3::hashCode),
+new GenerateObservedFrequencies());
+
+SingleOutputStreamOperator> 
filledObservedFreq =
+observedFreq
+.transform(
+"filledObservedFreq",
+Types.TUPLE(
+Types.STRING,
+  

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930906015


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";

Review Comment:
   > Could `bcCategoricalMarginsKey` and `bcLabelMarginsKey` be two local 
variables rather than class variables?
   
   I have put them in `transform`. 
   
   



##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import 

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930911825


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies
+DataStream> observedFreq =
+DataStreamUtils.mapPartition(
+colAndFeatureAndLabel.keyBy(Tuple3::hashCode),
+new GenerateObservedFrequencies());
+
+SingleOutputStreamOperator> 
filledObservedFreq =
+observedFreq
+.transform(
+"filledObservedFreq",
+Types.TUPLE(
+Types.STRING,
+  

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930911825


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies
+DataStream> observedFreq =
+DataStreamUtils.mapPartition(
+colAndFeatureAndLabel.keyBy(Tuple3::hashCode),
+new GenerateObservedFrequencies());
+
+SingleOutputStreamOperator> 
filledObservedFreq =
+observedFreq
+.transform(
+"filledObservedFreq",
+Types.TUPLE(
+Types.STRING,
+  

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930911037


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies
+DataStream> observedFreq =
+DataStreamUtils.mapPartition(
+colAndFeatureAndLabel.keyBy(Tuple3::hashCode),
+new GenerateObservedFrequencies());
+
+SingleOutputStreamOperator> 
filledObservedFreq =
+observedFreq
+.transform(
+"filledObservedFreq",
+Types.TUPLE(
+Types.STRING,
+  

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930906258


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();

Review Comment:
   > Could we check the size of the `inputs` here?
   
   I added a method to check the size of the inputs
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930906015


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";

Review Comment:
   > Could `bcCategoricalMarginsKey` and `bcLabelMarginsKey` be two local 
variables rather than class variables?
   
   I have put them in `transform`. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930902953


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";

Review Comment:
   > Could `bcCategoricalMarginsKey` and `bcLabelMarginsKey` be two local 
variables rather than class variables?
   
   I have put them in `transform`.
   
   



##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import 

[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-27 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930897436


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes

Review Comment:
   > Could you reformat the Java doc as `An AlgOperator that...` such that the 
java doc could be consistent with existing implememtations?
   > 
   > Also `This Transformer` -> `This AlgoOperator`
   
   I have added to the header of the document.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] taosiyuan163 commented on a diff in pull request #132: [FLINK-28571] Add AlgoOperator for Chi-squared test

2022-07-26 Thread GitBox


taosiyuan163 commented on code in PR #132:
URL: https://github.com/apache/flink-ml/pull/132#discussion_r930018268


##
flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java:
##
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.stats.chisqtest;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Chi-square test of independence of variables in a contingency table. This 
Transformer computes
+ * the chi-square statistic,p-value,and dof(number of degrees of freedom) for 
every feature in the
+ * contingency table, which constructed from the `observed` for each 
categorical values. All label
+ * and feature values must be categorical.
+ *
+ * See: http://en.wikipedia.org/wiki/Chi-squared_test.
+ */
+public class ChiSqTest implements AlgoOperator, 
ChiSqTestParams {
+
+final String bcCategoricalMarginsKey = "bcCategoricalMarginsKey";
+final String bcLabelMarginsKey = "bcLabelMarginsKey";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public ChiSqTest() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+
+final String[] inputCols = getInputCols();
+String labelCol = getLabelCol();
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+SingleOutputStreamOperator> 
colAndFeatureAndLabel =
+tEnv.toDataStream(inputs[0])
+.flatMap(new ExtractColAndFeatureAndLabel(inputCols, 
labelCol));
+
+// compute the observed frequencies
+DataStream> observedFreq =
+DataStreamUtils.mapPartition(
+colAndFeatureAndLabel.keyBy(Tuple3::hashCode),
+new GenerateObservedFrequencies());
+
+SingleOutputStreamOperator> 
filledObservedFreq =

Review Comment:
   > How about we just compute the `distinct labels` and postpone the `fill` 
operation to Line#199, e.g., `DataStream> 
categoricalStatistics =...`?
   > 
   > Using `parallellism=1` for computing all data is not efficient usually.