zhipeng93 commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1107984853


##########
docs/content/docs/operators/recommendation/swing.md:
##########
@@ -0,0 +1,194 @@
+---
+title: "Swing"
+type: docs
+aliases:
+- /operators/recommendation/swing.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Swing
+
+An AlgoOperator which implements the Swing algorithm.
+
+Swing is an item recall algorithm. The topology of user-item graph usually can 
be described as
+user-item-user or item-user-item, which are like 'swing'. For example, if both 
user <em>u</em> and user <em>v</em>
+have purchased the same commodity $i$, they will form a relationship diagram 
similar to a swing. If
+<em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to 
<em>i</em>, it is supposed <em>i</em>
+and <em>j</em> are similar. 
+
+See "<a href="https://arxiv.org/pdf/2010.05525.pdf";>Large Scale Product Graph 
Construction for Recommendation in
+E-commerce</a>" by Xiaoyong Yang, Yadong Zhu and Yi Zhang.
+
+### Input Columns
+
+| Param name | Type | Default  | Description |
+|:-----------|:-----|:---------|:------------|
+| itemCol    | Long | `"item"` | Item id.    |
+| userCol    | Long | `"user"` | User id     |
+### Output Columns
+
+| Param name | Type   | Default        | Description                           
                                      |
+|:-----------|:-------|:---------------|:----------------------------------------------------------------------------|
+| itemCol    | Long   | `"prediction"` | Item id.                              
                                      |
+| outputCol  | String | `"output"`     | Recommendations and their score. 
(e.g. "item_1,0.9;item_2,0.7;item_3,0.35") |

Review Comment:
   What about using the following description?
   
   `Top k similar items and their corresponding scores (e.g. 
"item_1,0.9;item_2,0.7;item_3,0.35").`



##########
docs/content/docs/operators/recommendation/swing.md:
##########
@@ -0,0 +1,194 @@
+---
+title: "Swing"
+type: docs
+aliases:
+- /operators/recommendation/swing.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Swing
+
+An AlgoOperator which implements the Swing algorithm.
+
+Swing is an item recall algorithm. The topology of user-item graph usually can 
be described as
+user-item-user or item-user-item, which are like 'swing'. For example, if both 
user <em>u</em> and user <em>v</em>
+have purchased the same commodity $i$, they will form a relationship diagram 
similar to a swing. If
+<em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to 
<em>i</em>, it is supposed <em>i</em>
+and <em>j</em> are similar. 
+
+See "<a href="https://arxiv.org/pdf/2010.05525.pdf";>Large Scale Product Graph 
Construction for Recommendation in
+E-commerce</a>" by Xiaoyong Yang, Yadong Zhu and Yi Zhang.
+
+### Input Columns
+
+| Param name | Type | Default  | Description |
+|:-----------|:-----|:---------|:------------|
+| itemCol    | Long | `"item"` | Item id.    |
+| userCol    | Long | `"user"` | User id     |
+### Output Columns
+
+| Param name | Type   | Default        | Description                           
                                      |
+|:-----------|:-------|:---------------|:----------------------------------------------------------------------------|
+| itemCol    | Long   | `"prediction"` | Item id.                              
                                      |

Review Comment:
   The default value of `itemCol` should be `"item"`.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.recommendation.swing.Swing;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link Swing}. */
+public class SwingTest {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+    private Table inputTable;
+
+    @Before
+    public void before() {
+        env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+        List<Row> inputRows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(0L, 10L),
+                                Row.of(0L, 11L),
+                                Row.of(0L, 12L),
+                                Row.of(1L, 13L),
+                                Row.of(1L, 12L),
+                                Row.of(2L, 10L),
+                                Row.of(2L, 11L),
+                                Row.of(2L, 12L),
+                                Row.of(3L, 13L),
+                                Row.of(3L, 12L),
+                                Row.of(4L, 12L),
+                                Row.of(4L, 10L),
+                                Row.of(4L, 11L),
+                                Row.of(4L, 12L),
+                                Row.of(4L, 13L)));
+        inputTable =
+                tEnv.fromDataStream(
+                        env.fromCollection(
+                                inputRows,
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            BasicTypeInfo.LONG_TYPE_INFO,
+                                            BasicTypeInfo.LONG_TYPE_INFO
+                                        },
+                                        new String[] {"user_id", "item_id"})));
+    }
+
+    private void compareResultAndExpected(List<Row> results) {
+        List<Row> expectedScoreRows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(10L, 
"11,0.058845768947156235;12,0.058845768947156235"),
+                                Row.of(11L, 
"10,0.058845768947156235;12,0.058845768947156235"),
+                                Row.of(
+                                        12L,
+                                        
"13,0.09134833828228624;10,0.058845768947156235;11,0.058845768947156235"),
+                                Row.of(13L, "12,0.09134833828228624")));
+
+        results.sort(Comparator.comparing(o -> o.getFieldAs(0)));
+
+        for (int i = 0; i < results.size(); i++) {
+            Row result = results.get(i);
+            String itemRankScore = result.getFieldAs(1);
+            Row expect = expectedScoreRows.get(i);
+            assertEquals(expect.getField(0), result.getField(0));
+            assertEquals(expect.getField(1), itemRankScore);
+        }
+    }
+
+    @Test
+    public void testParam() {
+        Swing swing = new Swing();
+
+        assertEquals("item", swing.getItemCol());
+        assertEquals("user", swing.getUserCol());
+        assertEquals(100, swing.getK());
+        assertEquals(1000, swing.getMaxUserNumPerItem());
+        assertEquals(10, swing.getMinUserBehavior());
+        assertEquals(1000, swing.getMaxUserBehavior());
+        assertEquals(15, swing.getAlpha1());
+        assertEquals(0, swing.getAlpha2());
+        assertEquals(0.3, swing.getBeta(), 1e-9);
+
+        swing.setItemCol("item_1")
+                .setUserCol("user_1")
+                .setK(20)
+                .setMaxUserNumPerItem(500)
+                .setMinUserBehavior(10)
+                .setMaxUserBehavior(50)
+                .setAlpha1(5)
+                .setAlpha2(1)
+                .setBeta(0.35);
+
+        assertEquals("item_1", swing.getItemCol());
+        assertEquals("user_1", swing.getUserCol());
+        assertEquals(20, swing.getK());
+        assertEquals(500, swing.getMaxUserNumPerItem());
+        assertEquals(10, swing.getMinUserBehavior());
+        assertEquals(50, swing.getMaxUserBehavior());
+        assertEquals(5, swing.getAlpha1());
+        assertEquals(1, swing.getAlpha2());
+        assertEquals(0.35, swing.getBeta(), 1e-9);
+    }
+
+    @Test
+    public void testInputWithIllegalDataType() {
+        List<Row> rows =
+                new ArrayList<>(Arrays.asList(Row.of(0, "10"), Row.of(1, 
"11"), Row.of(2, "")));
+
+        DataStream<Row> dataStream =
+                env.fromCollection(
+                        rows,
+                        new RowTypeInfo(
+                                new TypeInformation[] {
+                                    BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO
+                                },
+                                new String[] {"user_id", "item_id"}));
+        Table data = tEnv.fromDataStream(dataStream);
+
+        try {
+            Table[] swingResultTables =
+                    new Swing()
+                            .setItemCol("item_id")

Review Comment:
   Let's use the default value when possible, e.g., `itemCol`, `userCol` and 
`outputCol`. Using the default value would simplify the test and also shows how 
users would use the presented algorithms.
   
   Same for other tests.



##########
flink-ml-python/pyflink/ml/recommendation/swing.py:
##########
@@ -0,0 +1,215 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import typing
+
+from pyflink.ml.common.param import HasOutputCol
+from pyflink.ml.param import Param, StringParam, IntParam, FloatParam, 
ParamValidators
+from pyflink.ml.recommendation.common import JavaRecommendationAlgoOperator
+from pyflink.ml.wrapper import JavaWithParams
+
+
+class _SwingParams(
+    JavaWithParams,
+    HasOutputCol
+):
+    """
+    Params for :class:`Swing`.
+    """
+
+    USER_COL: Param[str] = StringParam(
+        "user_col",
+        "User column name.",
+        "user",
+        ParamValidators.not_null())
+
+    ITEM_COL: Param[str] = StringParam(
+        "item_col",
+        "Item column name.",
+        "item",
+        ParamValidators.not_null())
+
+    K: Param[int] = IntParam(
+        "k",
+        "The max number of similar items to output for each item.",
+        100,
+        ParamValidators.gt(0))
+
+    MAX_USER_NUM_PER_ITEM: Param[int] = IntParam(
+        "max_user_num_per_item",
+        "The max number of users(purchasers) for each item. If the number of 
users "
+        + "is greater than this value, then only maxUserNumPerItem users will "
+        + "be sampled and used in the computation of similarity between two 
items.",
+        1000,
+        ParamValidators.gt(0))
+
+    MIN_USER_BEHAVIOR: Param[int] = IntParam(
+        "min_user_behavior",
+        "The min number of items that a user purchases. If the items purchased 
by a user is "
+        + "smaller than this value, then this user is filtered out and will 
not be used in the "
+        + "computation.",
+        10,
+        ParamValidators.gt(0))
+
+    MAX_USER_BEHAVIOR: Param[int] = IntParam(
+        "max_user_behavior",
+        "The max number of items for a user purchases. If the items purchased 
by a user is "
+        + "greater than this value, then this user is filtered out and will 
not be used in the "
+        + "computation.",
+        1000,
+        ParamValidators.gt(0))
+
+    ALPHA1: Param[int] = IntParam(
+        "alpha1",
+        "Smooth factor for number of users that have purchased one item. The 
higher alpha1 is,"
+        + " the less purchasing behavior contributes to the similarity score.",
+        15,
+        ParamValidators.gt_eq(0))
+
+    ALPHA2: Param[int] = IntParam(
+        "alpha2",
+        "Smooth factor for number of users that have purchased the two target 
items. The higher "
+        + "alpha2 is, the less purchasing behavior contributes to the 
similarity score.",
+        0,
+        ParamValidators.gt_eq(0))
+
+    BETA: Param[float] = FloatParam(
+        "beta",
+        "Decay factor for number of users that have purchased one item. The 
higher beta is, the "
+        + "less purchasing behavior contributes to the similarity score.",
+        0.3,
+        ParamValidators.gt_eq(0))
+
+    def __init__(self, java_params):
+        super(_SwingParams, self).__init__(java_params)
+
+    def set_user_col(self, value: str):
+        return typing.cast(_SwingParams, self.set(self.USER_COL, value))
+
+    def get_user_col(self) -> str:
+        return self.get(self.USER_COL)
+
+    def set_item_col(self, value: str):
+        return typing.cast(_SwingParams, self.set(self.ITEM_COL, value))
+
+    def get_item_col(self) -> str:
+        return self.get(self.ITEM_COL)
+
+    def set_k(self, value: int):
+        return typing.cast(_SwingParams, self.set(self.K, value))
+
+    def get_k(self) -> int:
+        return self.get(self.K)
+
+    def set_max_user_num_per_item(self, value: int):
+        return typing.cast(_SwingParams, self.set(self.MAX_USER_NUM_PER_ITEM, 
value))
+
+    def get_max_user_num_per_item(self) -> int:
+        return self.get(self.MAX_USER_NUM_PER_ITEM)
+
+    def set_min_user_behavior(self, value: int):
+        return typing.cast(_SwingParams, self.set(self.MIN_USER_BEHAVIOR, 
value))
+
+    def get_min_user_behavior(self) -> int:
+        return self.get(self.MIN_USER_BEHAVIOR)
+
+    def set_max_user_behavior(self, value: int):
+        return typing.cast(_SwingParams, self.set(self.MAX_USER_BEHAVIOR, 
value))
+
+    def get_max_user_behavior(self) -> int:
+        return self.get(self.MAX_USER_BEHAVIOR)
+
+    def set_alpha1(self, value: int):
+        return typing.cast(_SwingParams, self.set(self.ALPHA1, value))
+
+    def get_alpha1(self) -> int:
+        return self.get(self.ALPHA1)
+
+    def set_alpha2(self, value: int):
+        return typing.cast(_SwingParams, self.set(self.ALPHA2, value))
+
+    def get_alpha2(self) -> int:
+        return self.get(self.ALPHA2)
+
+    def set_beta(self, value: float):
+        return typing.cast(_SwingParams, self.set(self.BETA, value))
+
+    def get_beta(self) -> float:
+        return self.get(self.BETA)
+
+    @property
+    def user_col(self) -> str:
+        return self.get_user_col()
+
+    @property
+    def item_col(self) -> str:
+        return self.get_item_col()
+
+    @property
+    def k(self) -> int:
+        return self.get_k()
+
+    @property
+    def max_user_num_per_item(self) -> int:
+        return self.get_max_user_num_per_item()
+
+    @property
+    def min_user_behavior(self) -> int:
+        return self.get_min_user_behavior()
+
+    @property
+    def max_user_behavior(self) -> int:
+        return self.get_max_user_behavior()
+
+    @property
+    def alpha1(self) -> int:
+        return self.get_alpha1()
+
+    @property
+    def alpha2(self) -> float:
+        return self.get_alpha2()
+
+    @property
+    def beta(self) -> float:
+        return self.get_beta()
+
+
+class Swing(JavaRecommendationAlgoOperator, _SwingParams):
+    """
+    An AlgoOperator which implements the Swing algorithm.
+
+    Swing is an item recall model. The topology of user-item graph usually can 
be described as
+    user-item-user or item-user-item, which are like 'swing'. For example, if 
both user u
+    and user v have purchased the same commodity i , they will form a 
relationship
+    diagram similar to a swing. If u and v have purchased commodity j in
+    addition to i, it is supposed i and j are similar.
+
+    This implementation is based on the algorithm proposed in the paper: 
"Large Scale Product

Review Comment:
   Let's make the python doc consistent with java doc.
   
   Same for others.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.recommendation.swing.Swing;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link Swing}. */
+public class SwingTest {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+    private Table inputTable;
+
+    @Before
+    public void before() {
+        env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+        List<Row> inputRows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(0L, 10L),
+                                Row.of(0L, 11L),
+                                Row.of(0L, 12L),
+                                Row.of(1L, 13L),
+                                Row.of(1L, 12L),
+                                Row.of(2L, 10L),
+                                Row.of(2L, 11L),
+                                Row.of(2L, 12L),
+                                Row.of(3L, 13L),
+                                Row.of(3L, 12L),
+                                Row.of(4L, 12L),
+                                Row.of(4L, 10L),
+                                Row.of(4L, 11L),
+                                Row.of(4L, 12L),
+                                Row.of(4L, 13L)));
+        inputTable =
+                tEnv.fromDataStream(
+                        env.fromCollection(
+                                inputRows,
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            BasicTypeInfo.LONG_TYPE_INFO,
+                                            BasicTypeInfo.LONG_TYPE_INFO
+                                        },
+                                        new String[] {"user_id", "item_id"})));
+    }
+
+    private void compareResultAndExpected(List<Row> results) {
+        List<Row> expectedScoreRows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(10L, 
"11,0.058845768947156235;12,0.058845768947156235"),
+                                Row.of(11L, 
"10,0.058845768947156235;12,0.058845768947156235"),
+                                Row.of(
+                                        12L,
+                                        
"13,0.09134833828228624;10,0.058845768947156235;11,0.058845768947156235"),
+                                Row.of(13L, "12,0.09134833828228624")));
+
+        results.sort(Comparator.comparing(o -> o.getFieldAs(0)));
+
+        for (int i = 0; i < results.size(); i++) {
+            Row result = results.get(i);
+            String itemRankScore = result.getFieldAs(1);
+            Row expect = expectedScoreRows.get(i);
+            assertEquals(expect.getField(0), result.getField(0));
+            assertEquals(expect.getField(1), itemRankScore);
+        }
+    }
+
+    @Test
+    public void testParam() {
+        Swing swing = new Swing();
+
+        assertEquals("item", swing.getItemCol());
+        assertEquals("user", swing.getUserCol());
+        assertEquals(100, swing.getK());
+        assertEquals(1000, swing.getMaxUserNumPerItem());
+        assertEquals(10, swing.getMinUserBehavior());
+        assertEquals(1000, swing.getMaxUserBehavior());
+        assertEquals(15, swing.getAlpha1());
+        assertEquals(0, swing.getAlpha2());
+        assertEquals(0.3, swing.getBeta(), 1e-9);
+
+        swing.setItemCol("item_1")
+                .setUserCol("user_1")
+                .setK(20)
+                .setMaxUserNumPerItem(500)
+                .setMinUserBehavior(10)
+                .setMaxUserBehavior(50)
+                .setAlpha1(5)
+                .setAlpha2(1)
+                .setBeta(0.35);
+
+        assertEquals("item_1", swing.getItemCol());
+        assertEquals("user_1", swing.getUserCol());
+        assertEquals(20, swing.getK());
+        assertEquals(500, swing.getMaxUserNumPerItem());
+        assertEquals(10, swing.getMinUserBehavior());
+        assertEquals(50, swing.getMaxUserBehavior());
+        assertEquals(5, swing.getAlpha1());
+        assertEquals(1, swing.getAlpha2());
+        assertEquals(0.35, swing.getBeta(), 1e-9);
+    }
+
+    @Test
+    public void testInputWithIllegalDataType() {
+        List<Row> rows =
+                new ArrayList<>(Arrays.asList(Row.of(0, "10"), Row.of(1, 
"11"), Row.of(2, "")));
+
+        DataStream<Row> dataStream =
+                env.fromCollection(
+                        rows,
+                        new RowTypeInfo(
+                                new TypeInformation[] {
+                                    BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO
+                                },
+                                new String[] {"user_id", "item_id"}));
+        Table data = tEnv.fromDataStream(dataStream);
+
+        try {
+            Table[] swingResultTables =
+                    new Swing()
+                            .setItemCol("item_id")
+                            .setUserCol("user_id")
+                            .setOutputCol("item_score")
+                            .setMinUserBehavior(1)
+                            .transform(data);
+            swingResultTables[0].execute().print();
+            fail();
+        } catch (RuntimeException e) {
+            assertEquals(IllegalArgumentException.class, e.getClass());
+            assertEquals("The types of user and item must be Long.", 
e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInputWithNull() {
+        List<Row> rows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(0L, 10L),
+                                Row.of(null, 12L),
+                                Row.of(1L, 13L),
+                                Row.of(3L, 12L)));
+
+        DataStream<Row> dataStream =
+                env.fromCollection(
+                        rows,
+                        new RowTypeInfo(
+                                new TypeInformation[] {
+                                    BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO
+                                },
+                                new String[] {"user_id", "item_id"}));
+        Table data = tEnv.fromDataStream(dataStream);
+        Swing swing = new 
Swing().setItemCol("item_id").setUserCol("user_id").setMinUserBehavior(1);
+        Table[] swingResultTables = swing.transform(data);
+
+        try {
+            swingResultTables[0].execute().print();

Review Comment:
   Let's avoid using print in unit test since it cannot be checked.



##########
docs/content/docs/operators/recommendation/swing.md:
##########
@@ -0,0 +1,194 @@
+---
+title: "Swing"
+type: docs
+aliases:
+- /operators/recommendation/swing.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Swing
+
+An AlgoOperator which implements the Swing algorithm.
+
+Swing is an item recall algorithm. The topology of user-item graph usually can 
be described as
+user-item-user or item-user-item, which are like 'swing'. For example, if both 
user <em>u</em> and user <em>v</em>
+have purchased the same commodity $i$, they will form a relationship diagram 
similar to a swing. If
+<em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to 
<em>i</em>, it is supposed <em>i</em>
+and <em>j</em> are similar. 
+
+See "<a href="https://arxiv.org/pdf/2010.05525.pdf";>Large Scale Product Graph 
Construction for Recommendation in
+E-commerce</a>" by Xiaoyong Yang, Yadong Zhu and Yi Zhang.
+
+### Input Columns
+
+| Param name | Type | Default  | Description |
+|:-----------|:-----|:---------|:------------|
+| itemCol    | Long | `"item"` | Item id.    |
+| userCol    | Long | `"user"` | User id     |

Review Comment:
   nit: The description usually ends with a `.`.
   
   Same for `outputCol`.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.recommendation.swing.Swing;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link Swing}. */
+public class SwingTest {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+    private Table inputTable;
+
+    @Before
+    public void before() {
+        env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+        List<Row> inputRows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(0L, 10L),
+                                Row.of(0L, 11L),
+                                Row.of(0L, 12L),
+                                Row.of(1L, 13L),
+                                Row.of(1L, 12L),
+                                Row.of(2L, 10L),
+                                Row.of(2L, 11L),
+                                Row.of(2L, 12L),
+                                Row.of(3L, 13L),
+                                Row.of(3L, 12L),
+                                Row.of(4L, 12L),
+                                Row.of(4L, 10L),
+                                Row.of(4L, 11L),
+                                Row.of(4L, 12L),
+                                Row.of(4L, 13L)));
+        inputTable =
+                tEnv.fromDataStream(
+                        env.fromCollection(
+                                inputRows,
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            BasicTypeInfo.LONG_TYPE_INFO,
+                                            BasicTypeInfo.LONG_TYPE_INFO
+                                        },
+                                        new String[] {"user_id", "item_id"})));
+    }
+
+    private void compareResultAndExpected(List<Row> results) {
+        List<Row> expectedScoreRows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(10L, 
"11,0.058845768947156235;12,0.058845768947156235"),
+                                Row.of(11L, 
"10,0.058845768947156235;12,0.058845768947156235"),
+                                Row.of(
+                                        12L,
+                                        
"13,0.09134833828228624;10,0.058845768947156235;11,0.058845768947156235"),
+                                Row.of(13L, "12,0.09134833828228624")));
+
+        results.sort(Comparator.comparing(o -> o.getFieldAs(0)));
+
+        for (int i = 0; i < results.size(); i++) {
+            Row result = results.get(i);
+            String itemRankScore = result.getFieldAs(1);
+            Row expect = expectedScoreRows.get(i);
+            assertEquals(expect.getField(0), result.getField(0));
+            assertEquals(expect.getField(1), itemRankScore);
+        }
+    }
+
+    @Test
+    public void testParam() {
+        Swing swing = new Swing();
+
+        assertEquals("item", swing.getItemCol());
+        assertEquals("user", swing.getUserCol());
+        assertEquals(100, swing.getK());
+        assertEquals(1000, swing.getMaxUserNumPerItem());
+        assertEquals(10, swing.getMinUserBehavior());
+        assertEquals(1000, swing.getMaxUserBehavior());
+        assertEquals(15, swing.getAlpha1());
+        assertEquals(0, swing.getAlpha2());
+        assertEquals(0.3, swing.getBeta(), 1e-9);
+
+        swing.setItemCol("item_1")
+                .setUserCol("user_1")
+                .setK(20)
+                .setMaxUserNumPerItem(500)
+                .setMinUserBehavior(10)
+                .setMaxUserBehavior(50)
+                .setAlpha1(5)
+                .setAlpha2(1)
+                .setBeta(0.35);
+
+        assertEquals("item_1", swing.getItemCol());
+        assertEquals("user_1", swing.getUserCol());
+        assertEquals(20, swing.getK());
+        assertEquals(500, swing.getMaxUserNumPerItem());
+        assertEquals(10, swing.getMinUserBehavior());
+        assertEquals(50, swing.getMaxUserBehavior());
+        assertEquals(5, swing.getAlpha1());
+        assertEquals(1, swing.getAlpha2());
+        assertEquals(0.35, swing.getBeta(), 1e-9);
+    }
+
+    @Test
+    public void testInputWithIllegalDataType() {
+        List<Row> rows =
+                new ArrayList<>(Arrays.asList(Row.of(0, "10"), Row.of(1, 
"11"), Row.of(2, "")));
+
+        DataStream<Row> dataStream =
+                env.fromCollection(
+                        rows,
+                        new RowTypeInfo(
+                                new TypeInformation[] {
+                                    BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO
+                                },
+                                new String[] {"user_id", "item_id"}));
+        Table data = tEnv.fromDataStream(dataStream);
+
+        try {
+            Table[] swingResultTables =
+                    new Swing()
+                            .setItemCol("item_id")
+                            .setUserCol("user_id")
+                            .setOutputCol("item_score")
+                            .setMinUserBehavior(1)
+                            .transform(data);
+            swingResultTables[0].execute().print();
+            fail();
+        } catch (RuntimeException e) {
+            assertEquals(IllegalArgumentException.class, e.getClass());
+            assertEquals("The types of user and item must be Long.", 
e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInputWithNull() {
+        List<Row> rows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(0L, 10L),
+                                Row.of(null, 12L),
+                                Row.of(1L, 13L),
+                                Row.of(3L, 12L)));
+
+        DataStream<Row> dataStream =
+                env.fromCollection(
+                        rows,
+                        new RowTypeInfo(
+                                new TypeInformation[] {
+                                    BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO
+                                },
+                                new String[] {"user_id", "item_id"}));
+        Table data = tEnv.fromDataStream(dataStream);
+        Swing swing = new 
Swing().setItemCol("item_id").setUserCol("user_id").setMinUserBehavior(1);
+        Table[] swingResultTables = swing.transform(data);
+
+        try {
+            swingResultTables[0].execute().print();
+            fail();
+        } catch (RuntimeException e) {
+            Throwable exception = ExceptionUtils.getRootCause(e);
+            assertEquals(RuntimeException.class, exception.getClass());
+            assertEquals("Data of user and item column must not be null.", 
exception.getMessage());
+        }
+    }
+
+    @Test
+    public void testOutputSchema() {
+        Swing swing =
+                new Swing()
+                        .setItemCol("item_id")
+                        .setUserCol("user_id")
+                        .setOutputCol("item_score")
+                        .setMinUserBehavior(1);
+        Table[] swingResultTables = swing.transform(inputTable);
+        Table output = swingResultTables[0];
+
+        assertEquals(
+                Arrays.asList("item_id", "item_score"),
+                output.getResolvedSchema().getColumnNames());
+    }
+
+    @Test
+    public void testTransform() {
+        Swing swing =
+                new Swing()
+                        .setItemCol("item_id")
+                        .setUserCol("user_id")
+                        .setMinUserBehavior(2)
+                        .setMaxUserBehavior(3);
+        Table[] swingResultTables = swing.transform(inputTable);
+        Table outputTable = swingResultTables[0];
+        List<Row> results = 
IteratorUtils.toList(outputTable.execute().collect());
+        compareResultAndExpected(results);
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        Swing swing = new 
Swing().setItemCol("item_id").setUserCol("user_id").setMinUserBehavior(1);
+        Swing loadedSwing =
+                TestUtils.saveAndReload(tEnv, swing, 
tempFolder.newFolder().getAbsolutePath());
+        Table outputTable = loadedSwing.transform(inputTable)[0];
+        List<Row> results = 
IteratorUtils.toList(outputTable.execute().collect());
+        compareResultAndExpected(results);
+    }
+
+    @Test
+    public void getParams() {

Review Comment:
   The code snippet here seems useless. Could you delete it?



##########
docs/content/docs/operators/recommendation/swing.md:
##########
@@ -0,0 +1,194 @@
+---
+title: "Swing"
+type: docs
+aliases:
+- /operators/recommendation/swing.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Swing
+
+An AlgoOperator which implements the Swing algorithm.
+
+Swing is an item recall algorithm. The topology of user-item graph usually can 
be described as
+user-item-user or item-user-item, which are like 'swing'. For example, if both 
user <em>u</em> and user <em>v</em>
+have purchased the same commodity $i$, they will form a relationship diagram 
similar to a swing. If
+<em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to 
<em>i</em>, it is supposed <em>i</em>
+and <em>j</em> are similar. 
+
+See "<a href="https://arxiv.org/pdf/2010.05525.pdf";>Large Scale Product Graph 
Construction for Recommendation in
+E-commerce</a>" by Xiaoyong Yang, Yadong Zhu and Yi Zhang.
+
+### Input Columns
+
+| Param name | Type | Default  | Description |
+|:-----------|:-----|:---------|:------------|
+| itemCol    | Long | `"item"` | Item id.    |
+| userCol    | Long | `"user"` | User id     |
+### Output Columns
+
+| Param name | Type   | Default        | Description                           
                                      |
+|:-----------|:-------|:---------------|:----------------------------------------------------------------------------|
+| itemCol    | Long   | `"prediction"` | Item id.                              
                                      |
+| outputCol  | String | `"output"`     | Recommendations and their score. 
(e.g. "item_1,0.9;item_2,0.7;item_3,0.35") |
+
+### Parameters
+
+Below are the parameters required by `Swing`.
+
+| Key               | Default    | Type    | Required | Description            
                                                                                
                                                                                
                                                                                
                                                                                
                                                   |
+|:------------------|:-----------|:--------|:---------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| userCol           | `"user"`   | String  | no       | User column name.      
                                                                                
                                                                                
                                                                                
                                                                                
                                                   |
+| itemCol           | `"item"`   | String  | no       | Item column name.      
                                                                                
                                                                                
                                                                                
                                                                                
                                                   |
+| maxUserNumPerItem | `1000`     | Integer | no       | The max number of 
user(purchasers) for each item. If the number of user is larger than this 
value, then only maxUserNumPerItem users will be sampled and considered in the 
computation of similarity between two items.                                    
                                                                                
                                                               |
+| k                 | `100`      | Integer | no       | The max number of 
similar items to output for each item.                                          
                                                                                
                                                                                
                                                                                
                                                        |
+| minUserBehavior   | `10`       | Integer | no       | The min number of 
items for a user purchases. If the items purchased by a user is smaller than 
this value, then this user is filtered out while gathering data. This can 
affect the speed of the computation. Set minUserBehavior larger in case the 
swing recommendation progresses very slowly.                                    
                                                                     |
+| maxUserBehavior   | `1000`     | Integer | no       | The max number of 
items for a user purchases. If the items purchased by a user is larger than 
this value, then this user is filtered out while gathering data. This can 
affect the speed of the computation. Set maxUserBehavior smaller in case the 
swing recommendation progresses very slowly. The IllegalArgumentException is 
raised if the value of maxUserBehavior is smaller than minUserBehavior. |
+| alpha1            | `15`       | Integer | no       | Smooth factor for 
number of users that have purchased one item. The higher alpha1 is, the less 
purchasing behavior contributes to the similarity score.                        
                                                                                
                                                                                
                                                           |
+| alpha2            | `0`        | Integer | no       | Smooth factor for 
number of users that have purchased the two target items. The higher alpha2 is, 
the less purchasing behavior contributes to the similarity score.               
                                                                                
                                                                                
                                                        |
+| beta              | `0.3`      | Double  | no       | Decay factor for 
number of users that have purchased one item. The higher beta is, the less 
purchasing behavior contributes to the similarity score.                        
                                                                                
                                                                                
                                                              |
+| outputCol         | `"output"` | String  | no       | Output column name.    
                                                                                
                                                                                
                                                                                
                                                                                
                                                   |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+package org.apache.flink.ml.examples.recommendation;
+
+import org.apache.flink.ml.recommendation.swing.Swing;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/**
+ * Simple program that creates a Swing instance and uses it to generate 
recommendations for items.
+ */
+public class SwingExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(0L, 10L),
+                        Row.of(0L, 11L),
+                        Row.of(0L, 12L),
+                        Row.of(1L, 13L),
+                        Row.of(1L, 12L),
+                        Row.of(2L, 10L),
+                        Row.of(2L, 11L),
+                        Row.of(2L, 12L),
+                        Row.of(3L, 13L),
+                        Row.of(3L, 12L));
+
+        Table inputTable = tEnv.fromDataStream(inputStream).as("user", "item");
+
+        // Creates a Swing object and initializes its parameters.
+        Swing swing = new 
Swing().setUserCol("user").setItemCol("item").setMinUserBehavior(1);
+
+        // Transforms the data.
+        Table[] outputTable = swing.transform(inputTable);
+
+        // Extracts and displays the result of swing algorithm.
+        for (CloseableIterator<Row> it = outputTable[0].execute().collect(); 
it.hasNext(); ) {
+            Row row = it.next();
+
+            long mainItem = row.getFieldAs(0);
+            String itemRankScore = row.getFieldAs(1);
+
+            System.out.printf("mainItem %d, recommendedItem %s\n", mainItem, 
itemRankScore);

Review Comment:
   Let's update it as:
   
   `System.out.printf("item: %d, top-k similar items: %s\n", mainItem, 
itemRankScore);`
   
   Same for python/java examples.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to