This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 5333fbf  [FLINK-19740][python] Fix to_pandas to Support EventTime in 
Blink Planner (#13899)
5333fbf is described below

commit 5333fbfe48542aceda3b6d21e76b48f7bde002ef
Author: HuangXingBo <hxbks...@gmail.com>
AuthorDate: Tue Nov 3 19:20:25 2020 +0800

    [FLINK-19740][python] Fix to_pandas to Support EventTime in Blink Planner 
(#13899)
---
 .../pyflink/table/tests/test_pandas_conversion.py  | 49 +++++++++++++++++++++-
 .../flink/table/runtime/arrow/ArrowUtils.java      |  4 +-
 .../sinks/SelectTableSinkSchemaConverter.java      |  2 +-
 3 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py 
b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index b35aaef..e1ca824 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -159,4 +159,51 @@ class 
BlinkBatchPandasConversionTests(PandasConversionTests,
 
 class BlinkStreamPandasConversionTests(PandasConversionITTests,
                                        PyFlinkBlinkStreamTableTestCase):
-    pass
+    def test_to_pandas_with_event_time(self):
+        self.env.set_parallelism(1)
+        # create source file path
+        import tempfile
+        from pyflink.datastream.time_characteristic import TimeCharacteristic
+        import os
+        tmp_dir = tempfile.gettempdir()
+        data = [
+            '2018-03-11 03:10:00',
+            '2018-03-11 03:10:00',
+            '2018-03-11 03:10:00',
+            '2018-03-11 03:40:00',
+            '2018-03-11 04:20:00',
+            '2018-03-11 03:30:00'
+        ]
+        source_path = tmp_dir + '/test_to_pandas_with_event_time.csv'
+        with open(source_path, 'w') as fd:
+            for ele in data:
+                fd.write(ele + '\n')
+
+        self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+
+        source_table = """
+            create table source_table(
+                rowtime TIMESTAMP(3),
+                WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
+            ) with(
+                'connector.type' = 'filesystem',
+                'format.type' = 'csv',
+                'connector.path' = '%s',
+                'format.ignore-first-line' = 'false',
+                'format.field-delimiter' = ','
+            )
+        """ % source_path
+        self.t_env.execute_sql(source_table)
+        t = self.t_env.from_path("source_table")
+        result_pdf = t.to_pandas()
+        import pandas as pd
+        os.remove(source_path)
+        assert_frame_equal(result_pdf, pd.DataFrame(
+            data={"rowtime": [
+                datetime.datetime(2018, 3, 11, 3, 10),
+                datetime.datetime(2018, 3, 11, 3, 10),
+                datetime.datetime(2018, 3, 11, 3, 10),
+                datetime.datetime(2018, 3, 11, 3, 40),
+                datetime.datetime(2018, 3, 11, 4, 20),
+                datetime.datetime(2018, 3, 11, 3, 30),
+            ]}))
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index aa2fcb3..bf23408 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -633,8 +633,8 @@ public final class ArrowUtils {
                                public RowData next() {
                                        // The SelectTableSink of blink planner 
will convert the table schema and we
                                        // need to keep the table schema used 
here be consistent with the converted table schema
-                                       TableSchema convertedTableSchema =
-                                               
SelectTableSinkSchemaConverter.changeDefaultConversionClass(table.getSchema());
+                                       TableSchema convertedTableSchema = 
SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(
+                                               
SelectTableSinkSchemaConverter.changeDefaultConversionClass(table.getSchema()));
                                        
DataFormatConverters.DataFormatConverter converter =
                                                
DataFormatConverters.getConverterForDataType(convertedTableSchema.toRowDataType());
                                        return (RowData) 
converter.toInternal(results.next());
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java
index 6615b57..97db017 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java
@@ -50,7 +50,7 @@ public class SelectTableSinkSchemaConverter {
         * Convert time attributes (proc time / event time) to regular timestamp
         * and build a new {@link TableSchema}.
         */
-       static TableSchema convertTimeAttributeToRegularTimestamp(TableSchema 
tableSchema) {
+       public static TableSchema 
convertTimeAttributeToRegularTimestamp(TableSchema tableSchema) {
                DataType[] dataTypes = tableSchema.getFieldDataTypes();
                String[] oldNames = tableSchema.getFieldNames();
 

Reply via email to