hequn8128 commented on a change in pull request #11668:
URL: https://github.com/apache/flink/pull/11668#discussion_r411152627



##########
File path: flink-python/pyflink/table/tests/test_udtf.py
##########
@@ -41,44 +41,67 @@ def test_table_function(self):
                              DataTypes.BIGINT()))
 
         t = self.t_env.from_elements([(1, 1, 3), (2, 1, 6), (3, 2, 9)], ['a', 
'b', 'c'])
-        t.join_lateral("multi_emit(a, multi_num(b)) as (x, y)") \
+        t = t.join_lateral("multi_emit(a, multi_num(b)) as (x, y)") \
             .left_outer_join_lateral("condition_multi_emit(x, y) as m") \
-            .select("x, y, m") \
-            .insert_into("Results")
-        self.t_env.execute("test")
-        actual = source_sink_utils.results()
+            .select("x, y, m")
+        actual = self._get_output(t)
         self.assert_equals(actual,
                            ["1,0,null", "1,1,null", "2,0,null", "2,1,null", 
"3,0,0", "3,0,1",
                             "3,0,2", "3,1,1", "3,1,2", "3,2,2", "3,3,null"])
 
     def test_table_function_with_sql_query(self):
-        table_sink = source_sink_utils.TestAppendSink(
+        self._register_table_sink(
             ['a', 'b', 'c'],
             [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+
         self.t_env.register_function(
             "multi_emit", udtf(MultiEmit(), [DataTypes.BIGINT(), 
DataTypes.BIGINT()],
                                [DataTypes.BIGINT(), DataTypes.BIGINT()]))
 
         t = self.t_env.from_elements([(1, 1, 3), (2, 1, 6), (3, 2, 9)], ['a', 
'b', 'c'])
         self.t_env.register_table("MyTable", t)
-        self.t_env.sql_query(
+        t = self.t_env.sql_query(
             "SELECT a, x, y FROM MyTable LEFT JOIN LATERAL TABLE(multi_emit(a, 
b)) as T(x, y)"
-            " ON TRUE") \
-            .insert_into("Results")
-        self.t_env.execute("test")
-        actual = source_sink_utils.results()
+            " ON TRUE")
+        actual = self._get_output(t)
         self.assert_equals(actual, ["1,1,0", "2,2,0", "3,3,0", "3,3,1"])
 
+    def _register_table_sink(self, field_names: list, field_types: list):
+        pass
+
+    def _get_output(self, t):
+        pass
+
 
 class PyFlinkStreamUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
                                                  PyFlinkStreamTableTestCase):
-    pass
+
+    def _register_table_sink(self, field_names: list, field_types: list):
+        table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
+        self.t_env.register_table_sink("Results", table_sink)
+
+    def _get_output(self, t):
+        t.insert_into("Results")
+        self.t_env.execute("test")
+        return source_sink_utils.results()

Review comment:
       How about making this two as the default implementation in the base 
class and only override them in the `PyFlinkBatchUserDefinedTableFunctionTests`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/AbstractPythonCorrelateRuleBase.java
##########
@@ -69,35 +67,29 @@ public boolean matches(RelOptRuleCall call) {
                return false;
        }
 
-       @Override
-       public RelNode convert(RelNode rel) {
-               DataStreamPythonCorrelateFactory factory = new 
DataStreamPythonCorrelateFactory(rel);
-               return factory.convertToCorrelate();
-       }
-
        /**
-        * The factory is responsible to creating {@link 
DataStreamPythonCorrelate}.
+        * The abstract factory is responsible to creating {@link 
DataSetPythonCorrelate} or {@link DataStreamPythonCorrelate}.

Review comment:
       is responsible for creating

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/stream/DataStreamPythonCorrelateRule.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.stream;
+
+import org.apache.flink.table.plan.nodes.FlinkConventions;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamPythonCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.plan.rules.AbstractPythonCorrelateRuleBase;
+import org.apache.flink.table.plan.schema.RowSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to

Review comment:
       responsible for converting

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/AbstractPythonCorrelateRuleBase.java
##########
@@ -39,16 +40,14 @@
 import scala.Some;
 
 /**
- * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
- * {@link DataStreamPythonCorrelate}.
+ * The abstract physical rule base is responsible for convert {@link 
FlinkLogicalCorrelate} to physical

Review comment:
       responsible for converting

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/batch/DataSetPythonCorrelateRule.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.batch;
+
+import org.apache.flink.table.plan.nodes.FlinkConventions;
+import org.apache.flink.table.plan.nodes.dataset.DataSetPythonCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.plan.rules.AbstractPythonCorrelateRuleBase;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link DataSetPythonCorrelate}.
+ */
+public class DataSetPythonCorrelateRule extends 
AbstractPythonCorrelateRuleBase {
+
+       public static final DataSetPythonCorrelateRule INSTANCE = new 
DataSetPythonCorrelateRule();
+
+       private DataSetPythonCorrelateRule() {
+               super(FlinkConventions.DATASET(), "DataSetPythonCorrelateRule");
+       }
+
+       @Override
+       public RelNode convert(RelNode rel) {
+               DataSetPythonCorrelateFactory factory = new 
DataSetPythonCorrelateFactory(rel);
+               return factory.convertToCorrelate();
+       }
+
+       /**
+        * The factory is responsible to creating {@link 
DataSetPythonCorrelate}.

Review comment:
       responsible for creating

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/batch/DataSetPythonCorrelateRule.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.batch;
+
+import org.apache.flink.table.plan.nodes.FlinkConventions;
+import org.apache.flink.table.plan.nodes.dataset.DataSetPythonCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.plan.rules.AbstractPythonCorrelateRuleBase;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to

Review comment:
       responsible for converting

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/stream/DataStreamPythonCorrelateRule.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.stream;
+
+import org.apache.flink.table.plan.nodes.FlinkConventions;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamPythonCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.plan.rules.AbstractPythonCorrelateRuleBase;
+import org.apache.flink.table.plan.schema.RowSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link DataStreamPythonCorrelate}.
+ */
+public class DataStreamPythonCorrelateRule extends 
AbstractPythonCorrelateRuleBase {
+
+       public static final RelOptRule INSTANCE = new 
DataStreamPythonCorrelateRule();
+
+       private DataStreamPythonCorrelateRule() {
+               super(FlinkConventions.DATASTREAM(), 
"DataStreamPythonCorrelateRule");
+       }
+
+       @Override
+       public RelNode convert(RelNode rel) {
+               DataStreamPythonCorrelateFactory factory = new 
DataStreamPythonCorrelateFactory(rel);
+               return factory.convertToCorrelate();
+       }
+
+       /**
+        * The factory is responsible to creating {@link 
DataStreamPythonCorrelate}.

Review comment:
       responsible for creating

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import 
org.apache.flink.table.runtime.runners.python.table.PythonTableFunctionRunner;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@link RichFlatMapFunction} used to invoke Python {@link TableFunction} 
functions for the
+ * old planner.
+ */
+@Internal
+public final class PythonTableFunctionFlatMap extends 
AbstractPythonStatelessFunctionFlatMap {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * The Python {@link TableFunction} to be executed.
+        */
+       private final PythonFunctionInfo tableFunction;
+
+       /**
+        * The correlate join type.
+        */
+       private final JoinRelType joinType;
+
+       public PythonTableFunctionFlatMap(
+               Configuration config,
+               PythonFunctionInfo tableFunction,
+               RowType inputType,
+               RowType outputType,
+               int[] udtfInputOffsets,
+               JoinRelType joinType) {
+               super(config, inputType, outputType, udtfInputOffsets);
+               this.tableFunction = Preconditions.checkNotNull(tableFunction);
+               Preconditions.checkArgument(
+                       joinType == JoinRelType.INNER || joinType == 
JoinRelType.LEFT,
+                       "The join type should be inner join or left join");
+               this.joinType = joinType;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               RowTypeInfo forwardedInputTypeInfo = (RowTypeInfo) 
TypeConversions.fromDataTypeToLegacyInfo(
+                       TypeConversions.fromLogicalToDataType(inputType));
+               forwardedInputSerializer = 
forwardedInputTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+               List<RowType.RowField> udtfOutputDataFields = new ArrayList<>(
+                       
outputType.getFields().subList(inputType.getFieldCount(), 
outputType.getFieldCount()));
+               userDefinedFunctionOutputType = new 
RowType(udtfOutputDataFields);
+
+               super.open(parameters);
+       }
+
+       @Override
+       public PythonEnv getPythonEnv() {
+               return tableFunction.getPythonFunction().getPythonEnv();
+       }
+
+       @Override
+       public PythonFunctionRunner<Row> createPythonFunctionRunner() throws 
IOException {
+               FnDataReceiver<byte[]> userDefinedFunctionResultReceiver = 
input -> {
+                       // handover to queue, do not block the result receiver 
thread
+                       userDefinedFunctionResultQueue.put(input);
+               };
+
+               return new PythonTableFunctionRunner(
+                       getRuntimeContext().getTaskName(),
+                       userDefinedFunctionResultReceiver,
+                       tableFunction,
+                       createPythonEnvironmentManager(),
+                       userDefinedFunctionInputType,
+                       userDefinedFunctionOutputType,
+                       jobOptions,
+                       getFlinkMetricContainer());
+       }
+
+       @Override
+       public void bufferInput(Row input) {
+               // always copy the input Row

Review comment:
       Maybe more details that why we always copy the input Row. What do you 
think?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/stream/DataStreamPythonCorrelateRule.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.stream;
+
+import org.apache.flink.table.plan.nodes.FlinkConventions;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamPythonCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.plan.rules.AbstractPythonCorrelateRuleBase;
+import org.apache.flink.table.plan.schema.RowSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
+ * {@link DataStreamPythonCorrelate}.
+ */
+public class DataStreamPythonCorrelateRule extends 
AbstractPythonCorrelateRuleBase {
+
+       public static final RelOptRule INSTANCE = new 
DataStreamPythonCorrelateRule();
+
+       private DataStreamPythonCorrelateRule() {
+               super(FlinkConventions.DATASTREAM(), 
"DataStreamPythonCorrelateRule");
+       }
+
+       @Override
+       public RelNode convert(RelNode rel) {
+               DataStreamPythonCorrelateFactory factory = new 
DataStreamPythonCorrelateFactory(rel);
+               return factory.convertToCorrelate();
+       }
+
+       /**
+        * The factory is responsible to creating {@link 
DataStreamPythonCorrelate}.
+        */
+       private static class DataStreamPythonCorrelateFactory extends 
PythonCorrelateFactoryBase{

Review comment:
       Add a blank here, i.e., `PythonCorrelateFactoryBase {`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to