xuyangzhong commented on code in PR #23898:
URL: https://github.com/apache/flink/pull/23898#discussion_r1427704155


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/RowToTuple2.scala:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.types.{Row, RowKind}
+
+/** Row to scala Tuple2. */
+class RowToTuple2 extends MapFunction[Row, (Boolean, Row)] {
+
+  /**
+   * The mapping method. Takes an element from the input data set and 
transforms it into exactly one
+   * element.
+   *
+   * @param value
+   *   The input value.
+   * @return
+   *   The transformed value
+   * @throws Exception

Review Comment:
   nit, this function does not seem to throw an exception, right?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/RowToTuple2.scala:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.types.{Row, RowKind}
+
+/** Row to scala Tuple2. */
+class RowToTuple2 extends MapFunction[Row, (Boolean, Row)] {
+
+  /**
+   * The mapping method. Takes an element from the input data set and 
transforms it into exactly one

Review Comment:
   What about copying the annotations from TableConversions#toRetractStream? 
   ```
     /**
      * The first field is a [[Boolean]] flag, the second field holds the 
original record with type [[Row]]. A
      * true [[Boolean]] flag indicates an add message, a false flag indicates 
a retract message.
      */
   ```



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/RowToTuple2.scala:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.types.{Row, RowKind}
+
+/** Row to scala Tuple2. */
+class RowToTuple2 extends MapFunction[Row, (Boolean, Row)] {

Review Comment:
   Nit, the function name is not clear enough. What about using 
`Row2RowWithRetractType` or something else if you have better ideas.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala:
##########
@@ -64,7 +63,9 @@ class TableAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
       .select('a, 'b1, 'b2)
 
     tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
-    val testHarness = createHarnessTester(resultTable.toRetractStream[Row], 
"GroupTableAggregate")
+    val testHarness = createHarnessTester(
+      resultTable.toChangelogStream.map(new RowToTuple2()).setDescription("Row 
to scala Tuple2"),

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/RowToTuple2.scala:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.types.{Row, RowKind}
+
+/** Row to scala Tuple2. */

Review Comment:
   This annotation is also too simple. 



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala:
##########
@@ -170,7 +171,9 @@ class TableAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
       .select('b1, 'b2)
 
     tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
-    val testHarness = createHarnessTester(resultTable.toRetractStream[Row], 
"GroupTableAggregate")
+    val testHarness = createHarnessTester(
+      resultTable.toChangelogStream.map(new RowToTuple2()).setDescription("Row 
to scala Tuple2"),

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala:
##########
@@ -83,7 +84,9 @@ class RankHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode) {
 
     val t1 = tEnv.sqlQuery(sql)
 
-    val testHarness = createHarnessTester(t1.toRetractStream[Row], 
"Rank(strategy=[RetractStrategy")
+    val testHarness = createHarnessTester(
+      t1.toChangelogStream.map(new RowToTuple2()).setDescription("Row to scala 
Tuple2"),

Review Comment:
   nit, remove the useless description.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala:
##########
@@ -179,7 +182,9 @@ class RankHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode) {
 
     val t1 = tEnv.sqlQuery(sql)
 
-    val testHarness = createHarnessTester(t1.toRetractStream[Row], 
"Rank(strategy=[RetractStrategy")
+    val testHarness = createHarnessTester(
+      t1.toChangelogStream.map(new RowToTuple2()).setDescription("Row to scala 
Tuple2"),

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to