HuangXingBo commented on code in PR #19140: URL: https://github.com/apache/flink/pull/19140#discussion_r907055031
########## flink-python/pyflink/table/types.py: ########## @@ -2007,6 +1870,15 @@ def _to_java_data_type(data_type: DataType): fields = [JDataTypes.FIELD(f.name, _to_java_data_type(f.data_type)) Review Comment: In `_to_java_data_type`, when converting Python's DataType to Java's DataType, the nullable and length information need also be passed over. ########## flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java: ########## @@ -18,51 +18,56 @@ package org.apache.flink.table.utils.python; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.sources.InputFormatTableSource; -import org.apache.flink.types.Row; +import org.apache.flink.api.common.python.PythonBridgeUtils; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.InputFormatProvider; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; -import java.util.List; +import java.io.IOException; -/** An {@link InputFormatTableSource} created by python 'from_element' method. */ -@Internal -public class PythonInputFormatTableSource extends InputFormatTableSource<Row> { - - /** - * The input format which contains the python data collection, usually created by {@link - * PythonTableUtils#getInputFormat(List, TypeInformation, ExecutionConfig)} method. - */ - private final InputFormat<Row, ? extends InputSplit> inputFormat; - - /** - * The row type info of the python data. It is generated by the python 'from_element' method. - */ - private final RowTypeInfo rowTypeInfo; +/** Implementation of {@link ScanTableSource} for python elements table. */ +public class PythonInputFormatTableSource implements ScanTableSource { Review Comment: Change name to `PythonDynamicTableSource`? ########## flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java: ########## @@ -18,51 +18,56 @@ package org.apache.flink.table.utils.python; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.sources.InputFormatTableSource; -import org.apache.flink.types.Row; +import org.apache.flink.api.common.python.PythonBridgeUtils; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.InputFormatProvider; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; -import java.util.List; +import java.io.IOException; -/** An {@link InputFormatTableSource} created by python 'from_element' method. */ -@Internal -public class PythonInputFormatTableSource extends InputFormatTableSource<Row> { - - /** - * The input format which contains the python data collection, usually created by {@link - * PythonTableUtils#getInputFormat(List, TypeInformation, ExecutionConfig)} method. - */ - private final InputFormat<Row, ? extends InputSplit> inputFormat; - - /** - * The row type info of the python data. It is generated by the python 'from_element' method. - */ - private final RowTypeInfo rowTypeInfo; +/** Implementation of {@link ScanTableSource} for python elements table. */ +public class PythonInputFormatTableSource implements ScanTableSource { + private final String filePath; + private final boolean batched; + private final DataType producedDataType; public PythonInputFormatTableSource( - InputFormat<Row, ? extends InputSplit> inputFormat, RowTypeInfo rowTypeInfo) { - this.inputFormat = inputFormat; - this.rowTypeInfo = rowTypeInfo; + String filePath, boolean batched, DataType producedDataType) { + this.filePath = filePath; + this.batched = batched; + this.producedDataType = producedDataType; + } + + @Override + public DynamicTableSource copy() { + return new PythonInputFormatTableSource(filePath, batched, producedDataType); } @Override - public InputFormat<Row, ?> getInputFormat() { - return inputFormat; + public String asSummaryString() { + return "PythonInputFormatTableSource"; } @Override - public TableSchema getTableSchema() { - return TableSchema.fromTypeInfo(rowTypeInfo); + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); } @Override - public TypeInformation<Row> getReturnType() { - return rowTypeInfo; + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + try { + InputFormat<RowData, ?> inputFormat = + PythonTableUtils.getInputFormat( + PythonBridgeUtils.readPythonObjects(filePath, batched), Review Comment: Is there a call to this batched parameter as False? ########## flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Testing Sinks that collects test output data for validation. */ +public class TestingSinks { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestingSinks.class); + + /** TestAppendingSink for testing. */ + public static class TestAppendingSink implements DynamicTableSink { + private final DataType rowDataType; + + public TestAppendingSink(DataType rowDataType) { + this.rowDataType = rowDataType; + LOGGER.info("rowDataType: " + rowDataType); + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return requestedMode; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + final DataStructureConverter converter = + context.createDataStructureConverter(rowDataType); + return (DataStreamSinkProvider) + (providerContext, dataStream) -> dataStream.addSink(new RowSink(converter)); + } + + @Override + public DynamicTableSink copy() { + return new TestAppendingSink(rowDataType); + } + + @Override + public String asSummaryString() { + return String.format("TestingAppendSink(%s)", DataType.getFields(rowDataType)); + } + } + + /** RowSink for testing. */ + static class RowSink implements SinkFunction<RowData> { + private final DynamicTableSink.DataStructureConverter converter; + + public RowSink(DynamicTableSink.DataStructureConverter converter) { + this.converter = converter; + } + + @Override + public void invoke(RowData value, Context context) { + RowKind rowKind = value.getRowKind(); + Row data = (Row) converter.toExternal(value); + LOGGER.info("row data: " + data.toString()); Review Comment: remove ? ########## flink-python/src/test/java/org/apache/flink/table/utils/TestCollectionTableFactory.java: ########## @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.io.CollectionInputFormat; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Testing CollectionTableFactory that creates collection DynamicTableSource and DynamicTableSink. + */ +public class TestCollectionTableFactory Review Comment: Do we need to remove `org.apache.flink.table.legacyutils.TestCollectionTableFactory` and other deprecated classes in `flink/flink-python/src/test/java/org/apache/flink/table/legacyutils`? ########## flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java: ########## @@ -18,51 +18,56 @@ package org.apache.flink.table.utils.python; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.sources.InputFormatTableSource; -import org.apache.flink.types.Row; +import org.apache.flink.api.common.python.PythonBridgeUtils; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.InputFormatProvider; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; -import java.util.List; +import java.io.IOException; -/** An {@link InputFormatTableSource} created by python 'from_element' method. */ -@Internal -public class PythonInputFormatTableSource extends InputFormatTableSource<Row> { - - /** - * The input format which contains the python data collection, usually created by {@link - * PythonTableUtils#getInputFormat(List, TypeInformation, ExecutionConfig)} method. - */ - private final InputFormat<Row, ? extends InputSplit> inputFormat; - - /** - * The row type info of the python data. It is generated by the python 'from_element' method. - */ - private final RowTypeInfo rowTypeInfo; +/** Implementation of {@link ScanTableSource} for python elements table. */ +public class PythonInputFormatTableSource implements ScanTableSource { + private final String filePath; + private final boolean batched; + private final DataType producedDataType; public PythonInputFormatTableSource( - InputFormat<Row, ? extends InputSplit> inputFormat, RowTypeInfo rowTypeInfo) { - this.inputFormat = inputFormat; - this.rowTypeInfo = rowTypeInfo; + String filePath, boolean batched, DataType producedDataType) { + this.filePath = filePath; + this.batched = batched; + this.producedDataType = producedDataType; + } + + @Override + public DynamicTableSource copy() { + return new PythonInputFormatTableSource(filePath, batched, producedDataType); } @Override - public InputFormat<Row, ?> getInputFormat() { - return inputFormat; + public String asSummaryString() { + return "PythonInputFormatTableSource"; Review Comment: ```suggestion return "Python Table Source"; ``` ########## flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatFactory.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils.python; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.table.utils.python.PythonInputFormatTableOptions.BATCH_MODE; +import static org.apache.flink.table.utils.python.PythonInputFormatTableOptions.INPUT_FILE_PATH; + +/** Table source factory for PythonInputFormatTableSource. */ +public class PythonInputFormatFactory implements DynamicTableSourceFactory { Review Comment: change name to `PythonDynamicTableFactory`? ########## flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Testing Sinks that collects test output data for validation. */ +public class TestingSinks { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestingSinks.class); + + /** TestAppendingSink for testing. */ + public static class TestAppendingSink implements DynamicTableSink { + private final DataType rowDataType; + + public TestAppendingSink(DataType rowDataType) { + this.rowDataType = rowDataType; + LOGGER.info("rowDataType: " + rowDataType); Review Comment: remove? ########## flink-python/src/test/java/org/apache/flink/table/utils/TestingDescriptors.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedFieldReference; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.sources.tsextractors.TimestampExtractor; +import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Testing Descriptors for python tests. */ +public class TestingDescriptors { + + /** CustomAssigner for testing. */ + public static class CustomAssigner extends PunctuatedWatermarkAssigner { + @Override + public Watermark getWatermark(Row row, long timestamp) { + throw new UnsupportedOperationException(); + } + } + + /** CustomExtractor for testing. */ + public static class CustomExtractor extends TimestampExtractor { Review Comment: `TimestampExtractor` is marked as Deprecated. ########## flink-python/src/test/java/org/apache/flink/table/utils/TestCollectionTableFactory.java: ########## @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.io.CollectionInputFormat; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Testing CollectionTableFactory that creates collection DynamicTableSource and DynamicTableSink. + */ +public class TestCollectionTableFactory + implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static boolean isStreaming = true; + private static final LinkedList<Row> SOURCE_DATA = new LinkedList<>(); + private static final LinkedList<Row> DIM_DATA = new LinkedList<>(); + private static final LinkedList<Row> RESULT = new LinkedList<>(); + + private static long emitIntervalMS = -1L; + + public static void initData(List<Row> sourceData, List<Row> dimData, Long emitInterval) { Review Comment: When will `initData` be called? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org