[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2023-01-05 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1063193679


##
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinHashLSHTest.java:
##
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.lsh.MinHashLSH;
+import org.apache.flink.ml.feature.lsh.MinHashLSHModel;
+import org.apache.flink.ml.feature.lsh.MinHashLSHModelData;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Tests {@link MinHashLSH} and {@link MinHashLSHModel}. */
+public class MinHashLSHTest extends AbstractTestBase {
+@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+
+/**
+ * Default case for most tests.
+ *
+ * @return a tuple including the estimator, input data table, and output 
rows.
+ */
+private Tuple3> getDefaultCase() {

Review Comment:
   I have updated this part in both Java and Python codes to make them 
consistent with other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-29 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1059263315


##
docs/content/docs/operators/feature/minhashlsh.md:
##
@@ -0,0 +1,276 @@
+---
+title: "MinHash LSH"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/minhashlsh.html
+---
+
+
+
+## MinHash LSH
+
+MinHash LSH is a Locality Sensitive Hashing (LSH) scheme for Jaccard distance 
metric.
+The input features are sets of natural numbers represented as non-zero indices 
of vectors,
+either dense vectors or sparse vectors. Typically, sparse vectors are more 
efficient.
+
+### Input Columns
+
+| Param name | Type   | Default   | Description|
+|:---|:---|:--|:---|
+| inputCol   | Vector | `"input"` | Features to be mapped. |
+
+### Output Columns
+
+| Param name | Type  | Default| Description  |
+|:---|:--|:---|:-|
+| outputCol  | DenseVector[] | `"output"` | Hash values. |
+
+### Parameters
+
+| Key | Default
   | Type| Required | Description   
 |
+|-|---|-|--||
+| inputCol| `"input"`  
   | String  | no   | Input column name.
 |
+| outputCol   | `"output"` 
   | String  | no   | Output column name.   
 |
+| seed| 
`"org.apache.flink.ml.feature.lsh.MinHashLSH".hashCode()` | Long| no   
| The random seed.   |

Review Comment:
   Alright, I'll following existing example.



##
docs/content/docs/operators/feature/minhashlsh.md:
##
@@ -0,0 +1,276 @@
+---
+title: "MinHash LSH"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/minhashlsh.html
+---
+
+
+
+## MinHash LSH
+
+MinHash LSH is a Locality Sensitive Hashing (LSH) scheme for Jaccard distance 
metric.
+The input features are sets of natural numbers represented as non-zero indices 
of vectors,
+either dense vectors or sparse vectors. Typically, sparse vectors are more 
efficient.
+
+### Input Columns
+
+| Param name | Type   | Default   | Description|
+|:---|:---|:--|:---|
+| inputCol   | Vector | `"input"` | Features to be mapped. |
+
+### Output Columns
+
+| Param name | Type  | Default| Description  |
+|:---|:--|:---|:-|
+| outputCol  | DenseVector[] | `"output"` | Hash values. |
+
+### Parameters
+
+| Key | Default
   | Type| Required | Description   
 |
+|-|---|-|--||
+| inputCol| `"input"`  
   | String  | no   | Input column name.
 |
+| outputCol   | `"output"` 
   | String  | no   | Output column name.   
 |
+| seed| 
`"org.apache.flink.ml.feature.lsh.MinHashLSH".hashCode()` | Long| no   
| The random seed.   |

Review Comment:
   Alright, I'll follow existing example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-29 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1059263123


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHParams.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.common.param.HasSeed;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params for {@link LSH}.
+ *
+ * @param  The class type of this instance.
+ */
+public interface LSHParams extends HasInputCol, HasOutputCol, 
HasSeed {
+Param NUM_HASH_TABLES =

Review Comment:
   I have updated the the descriptions of these two parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-29 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1059262975


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHModel.java:
##
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+/**
+ * Base class for LSH model.
+ *
+ * In addition to transforming input feature vectors to multiple hash 
values, it also supports
+ * approximate nearest neighbors search within a dataset regarding a key 
vector and approximate
+ * similarity join between two datasets.
+ *
+ * @param  class type of the LSHModel implementation itself.
+ */
+abstract class LSHModel> implements Model, 
LSHModelParams {
+private static final String MODEL_DATA_BC_KEY = "modelData";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+/** Stores the corresponding model data class of T. */
+private final Class modelDataClass;
+
+protected Table modelDataTable;
+
+public LSHModel(Class modelDataClass) {
+this.modelDataClass = modelDataClass;
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public T setModelData(Table... inputs) {
+modelDataTable = inputs[0];
+return (T) this;
+}
+
+@Override
+public Table[] getModelData() {
+return new Table[] {modelDataTable};
+}
+
+@Override
+public Map, Object> getParamMap() {
+return paramMap;
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream modelData =
+tEnv.toDataStream(modelDataTable, modelDataClass);
+
+RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+TypeInformation outputType = 
TypeInformation.of(DenseVector[].class);
+RowTypeInfo outputTypeInfo =
+new RowTypeInfo(
+ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), 
outputType),
+ArrayUtils.addAll(inputTypeInfo.getFieldNames(), 
getOutputCol()));
+
+DataStream output =
+BroadcastUtils.withBroadcastStream(
+
Collections.singletonList(tEnv.toDataStream(inputs[0])),
+Collections.singletonMap(MODEL_DATA_BC_KEY, modelData),
+inputList -> {
+//noinspection unchecked
+DataStream data = 

[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-29 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1059261844


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHParams.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.common.param.HasSeed;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params for {@link LSH}.
+ *
+ * @param  The class type of this instance.
+ */
+public interface LSHParams extends HasInputCol, HasOutputCol, 
HasSeed {

Review Comment:
   I totally agree with you. I've updated the parameters and also sync changes 
to Python side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-29 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1059261634


##
docs/content/docs/operators/feature/minhashlsh.md:
##
@@ -0,0 +1,276 @@
+---
+title: "MinHash LSH"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/minhashlsh.html
+---
+
+
+
+## MinHash LSH
+
+MinHash LSH is a Locality Sensitive Hashing (LSH) scheme for Jaccard distance 
metric.
+The input features are sets of natural numbers represented as non-zero indices 
of vectors,
+either dense vectors or sparse vectors. Typically, sparse vectors are more 
efficient.

Review Comment:
   Some descriptions about these two APIs are updated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-29 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1059261237


##
flink-ml-python/pyflink/ml/lib/feature/lsh.py:
##
@@ -0,0 +1,191 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+import typing
+from abc import ABC
+from pyflink.java_gateway import get_gateway
+from pyflink.table import Table
+from pyflink.util.java_utils import to_jarray
+
+from pyflink.ml.core.linalg import Vector, DenseVector, SparseVector
+from pyflink.ml.core.param import Param, IntParam, ParamValidators
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureEstimator, 
JavaFeatureModel
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, HasSeed
+
+
+class _LSHParams(
+JavaWithParams,
+HasInputCol,
+HasOutputCol,
+HasSeed
+):
+"""
+Params for :class:`LSH`
+"""
+
+NUM_HASH_TABLES: Param[int] = IntParam(
+"num_hash_tables", "Number of hash tables.", 1, 
ParamValidators.gt_eq(1)
+)
+
+NUM_HASH_FUNCTIONS_PER_TABLE: Param[int] = IntParam(
+"num_hash_functions_per_table",
+"Number of hash functions per table.",
+1,
+ParamValidators.gt_eq(1.))
+
+def __init__(self, java_params):
+super(_LSHParams, self).__init__(java_params)
+
+def set_num_hash_tables(self, value: int):
+return typing.cast(_LSHParams, self.set(self.NUM_HASH_TABLES, value))
+
+def get_num_hash_tables(self):
+return self.get(self.NUM_HASH_TABLES)
+
+@property
+def num_hash_tables(self):
+return self.get_num_hash_tables()
+
+def set_num_hash_functions_per_table(self, value: int):
+return typing.cast(_LSHParams, 
self.set(self.NUM_HASH_FUNCTIONS_PER_TABLE, value))
+
+def get_num_hash_functions_per_table(self):
+return self.get(self.NUM_HASH_FUNCTIONS_PER_TABLE)
+
+@property
+def num_hash_functions_per_table(self):
+return self.get_num_hash_functions_per_table()
+
+
+class _LSHModelParams(JavaWithParams,
+  HasInputCol,
+  HasOutputCol):
+...

Review Comment:
   Thanks for your reminder. I've added the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-21 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r105458


##
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinHashLSHTest.java:
##
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.lsh.MinHashLSH;
+import org.apache.flink.ml.feature.lsh.MinHashLSHModel;
+import org.apache.flink.ml.feature.lsh.MinHashLSHModelData;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Tests {@link MinHashLSH} and {@link MinHashLSHModel}. */
+public class MinHashLSHTest extends AbstractTestBase {
+@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+
+/**
+ * Default case for most tests.
+ *
+ * @return a tuple including the estimator, input data table, and output 
rows.
+ */
+private Tuple3> getDefaultCase() {

Review Comment:
   Just use this method to wrap the model, input data, and expected results to 
indicates their correspondence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-20 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1054052585


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSH.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for estimators which implement LSH (Locality-sensitive hashing) 
algorithms.
+ *
+ * The basic idea of LSH algorithms is to use to a family of hash functions 
to map data samples

Review Comment:
   `NUM_HASH_TABLES` and `NUM_HASH_FUNCTIONS_PER_TABLE` are for OR- and 
AND-amplification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-16 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1050566402


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##
@@ -133,6 +135,15 @@ public static  DataStream reduce(DataStream 
input, ReduceFunction fu
 }
 }
 
+public static  DataStream reduce(KeyedStream input, 
ReduceFunction func) {

Review Comment:
   I've confirmed `BatchGroupedReduceOperator` works well in streaming mode. 
However, in case of unexpected changes of its implementation, I made a copy of 
this class in `DataStreamUtils`, named `KeyedReduceOperator`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-16 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1050339527


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##
@@ -133,6 +135,15 @@ public static  DataStream reduce(DataStream 
input, ReduceFunction fu
 }
 }
 
+public static  DataStream reduce(KeyedStream input, 
ReduceFunction func) {

Review Comment:
   Actually, `KeyedStream.reduce(...)` cannot produce the expected results. It 
outputs a reduced result for **every** input element, see 
[StreamGroupedReduceOperator](https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceOperator.java#L57).
 
   
   I'll check whether `BatchGroupedReduceOperator` works in streaming mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-15 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1050354918


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/MinHashLSHModelData.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * Model data of {@link MinHashLSHModel}.
+ *
+ * This class also provides classes to save/load model data.
+ */
+public class MinHashLSHModelData implements LSHScheme {
+
+// A large prime smaller than sqrt(2^63 − 1)
+private static final int HASH_PRIME = 2038074743;
+
+public int numHashTables;
+public int numHashFunctionsPerTable;
+public int[] randCoeffA;
+public int[] randCoeffB;
+
+public MinHashLSHModelData() {}
+
+public MinHashLSHModelData(
+int numHashTables, int numHashFunctionsPerTable, int[] randCoeffA, 
int[] randCoeffB) {
+this.numHashTables = numHashTables;
+this.numHashFunctionsPerTable = numHashFunctionsPerTable;
+this.randCoeffA = randCoeffA;
+this.randCoeffB = randCoeffB;
+}
+
+public static MinHashLSHModelData generateModelData(

Review Comment:
   If the random seed is unchanged, the coefficients are deterministic. 
   
   Yet, there are some reasons, I think, making storing coefficients more 
suitable:
   1. In concept, coefficients are part of MinHash LSH model, while the seed is 
not.
   2. It might happen that users set a different random seed for the model, 
which creates a totally different model without fitting.
   3. There is a chance that the model data is used outside Java ecosystem. In 
this situation, it is hard to reproduce the same model.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-15 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1050339527


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##
@@ -133,6 +135,15 @@ public static  DataStream reduce(DataStream 
input, ReduceFunction fu
 }
 }
 
+public static  DataStream reduce(KeyedStream input, 
ReduceFunction func) {

Review Comment:
   Actually, `KeyedStream.reduce(...)` cannot produce the expected results. It 
outputs a reduced result for **every** input element, see 
[StreamGroupedReduceOperator](https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceOperator.java#L57).
 
   
   I'll check whether `StreamGroupedReduceOperator` works in streaming mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-15 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1050303584


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHModel.java:
##
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+/**
+ * Base class for LSH model.
+ *
+ * @param  class type of the LSHModel implementation itself.
+ */
+abstract class LSHModel> implements Model, 
LSHParams {
+private static final String MODEL_DATA_BC_KEY = "modelData";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+/** Stores the corresponding model data class of T. */
+private final Class modelDataClass;
+
+protected Table modelDataTable;
+
+public LSHModel(Class modelDataClass) {
+this.modelDataClass = modelDataClass;
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public T setModelData(Table... inputs) {
+modelDataTable = inputs[0];
+return (T) this;
+}
+
+@Override
+public Table[] getModelData() {
+return new Table[] {modelDataTable};
+}
+
+@Override
+public Map, Object> getParamMap() {
+return paramMap;
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream modelData = tEnv.toDataStream(modelDataTable, 
modelDataClass);
+
+RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+TypeInformation outputType = 
TypeInformation.of(DenseVector[].class);
+RowTypeInfo outputTypeInfo =
+new RowTypeInfo(
+ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), 
outputType),
+ArrayUtils.addAll(inputTypeInfo.getFieldNames(), 
getOutputCol()));
+
+DataStream output =
+BroadcastUtils.withBroadcastStream(
+
Collections.singletonList(tEnv.toDataStream(inputs[0])),
+Collections.singletonMap(MODEL_DATA_BC_KEY, modelData),
+inputList -> {
+//noinspection unchecked
+DataStream data = (DataStream) 
inputList.get(0);
+return data.map(
+new 
PredictOutputMapFunction(getInputCol()), outputTypeInfo);
+});
+return new Table[]