This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 5333fbf [FLINK-19740][python] Fix to_pandas to Support EventTime in Blink Planner (#13899) 5333fbf is described below commit 5333fbfe48542aceda3b6d21e76b48f7bde002ef Author: HuangXingBo <hxbks...@gmail.com> AuthorDate: Tue Nov 3 19:20:25 2020 +0800 [FLINK-19740][python] Fix to_pandas to Support EventTime in Blink Planner (#13899) --- .../pyflink/table/tests/test_pandas_conversion.py | 49 +++++++++++++++++++++- .../flink/table/runtime/arrow/ArrowUtils.java | 4 +- .../sinks/SelectTableSinkSchemaConverter.java | 2 +- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py index b35aaef..e1ca824 100644 --- a/flink-python/pyflink/table/tests/test_pandas_conversion.py +++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py @@ -159,4 +159,51 @@ class BlinkBatchPandasConversionTests(PandasConversionTests, class BlinkStreamPandasConversionTests(PandasConversionITTests, PyFlinkBlinkStreamTableTestCase): - pass + def test_to_pandas_with_event_time(self): + self.env.set_parallelism(1) + # create source file path + import tempfile + from pyflink.datastream.time_characteristic import TimeCharacteristic + import os + tmp_dir = tempfile.gettempdir() + data = [ + '2018-03-11 03:10:00', + '2018-03-11 03:10:00', + '2018-03-11 03:10:00', + '2018-03-11 03:40:00', + '2018-03-11 04:20:00', + '2018-03-11 03:30:00' + ] + source_path = tmp_dir + '/test_to_pandas_with_event_time.csv' + with open(source_path, 'w') as fd: + for ele in data: + fd.write(ele + '\n') + + self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) + + source_table = """ + create table source_table( + rowtime TIMESTAMP(3), + WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE + ) with( + 'connector.type' = 'filesystem', + 'format.type' = 'csv', + 'connector.path' = '%s', + 'format.ignore-first-line' = 'false', + 'format.field-delimiter' = ',' + ) + """ % source_path + self.t_env.execute_sql(source_table) + t = self.t_env.from_path("source_table") + result_pdf = t.to_pandas() + import pandas as pd + os.remove(source_path) + assert_frame_equal(result_pdf, pd.DataFrame( + data={"rowtime": [ + datetime.datetime(2018, 3, 11, 3, 10), + datetime.datetime(2018, 3, 11, 3, 10), + datetime.datetime(2018, 3, 11, 3, 10), + datetime.datetime(2018, 3, 11, 3, 40), + datetime.datetime(2018, 3, 11, 4, 20), + datetime.datetime(2018, 3, 11, 3, 30), + ]})) diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java index aa2fcb3..bf23408 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java @@ -633,8 +633,8 @@ public final class ArrowUtils { public RowData next() { // The SelectTableSink of blink planner will convert the table schema and we // need to keep the table schema used here be consistent with the converted table schema - TableSchema convertedTableSchema = - SelectTableSinkSchemaConverter.changeDefaultConversionClass(table.getSchema()); + TableSchema convertedTableSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp( + SelectTableSinkSchemaConverter.changeDefaultConversionClass(table.getSchema())); DataFormatConverters.DataFormatConverter converter = DataFormatConverters.getConverterForDataType(convertedTableSchema.toRowDataType()); return (RowData) converter.toInternal(results.next()); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java index 6615b57..97db017 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java @@ -50,7 +50,7 @@ public class SelectTableSinkSchemaConverter { * Convert time attributes (proc time / event time) to regular timestamp * and build a new {@link TableSchema}. */ - static TableSchema convertTimeAttributeToRegularTimestamp(TableSchema tableSchema) { + public static TableSchema convertTimeAttributeToRegularTimestamp(TableSchema tableSchema) { DataType[] dataTypes = tableSchema.getFieldDataTypes(); String[] oldNames = tableSchema.getFieldNames();