[jira] [Updated] (FLINK-32040) The WatermarkStrategy defined with the Function(with_idleness) report an error
[ https://issues.apache.org/jira/browse/FLINK-32040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joekwal updated FLINK-32040: Description: *version:* upgrade pyflink1.15.2 to pyflink1.16.1 *Report an error:* Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'data_stream.assign_timestamps_and_watermarks(...)'? The application before with version 1.15.2 has never reported the error. *Example:* {code:java} ```python``` class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp: int) -> int: return value['version'] sql=""" select columns,version(milliseconds) from kafka_source """ table = st_env.sql_query(sql) stream = st_env.to_changelog_stream(table) stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) stream = stream.key_by(CommonKeySelector()) \ .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ .process(WindowFunction(), typeInfo){code} Try to debug to trace ??pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks?? and find ??watermark_strategy._timestamp_assigner?? is none. *Solution:* Remove the function ??with_idleness(Duration.of_seconds(10))?? {code:java} stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner())) {code} Is this a bug? was: *version:* upgrade pyflink1.15.2 to pyflink1.16.1 *Report an error:* Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'data_stream.assign_timestamps_and_watermarks(...)'? The application before with version 1.15.2 has never reported the error. *Example:* {code:java} ```python``` class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp: int) -> int: return value['version'] sql=""" select columns,version(milliseconds) from kafka_source """ table = st_env.sql_query(sql) stream = st_env.to_changelog_stream(table) stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) stream = stream.key_by(CommonKeySelector()) \ .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ .process(WindowFunction(), typeInfo){code} Try to debug to trace pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks and find watermark_strategy._timestamp_assigner is none. *Solution:* Remove function–with_idleness(Duration.of_seconds(10)) {code:java} stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner())) {code} Is this a bug? > The WatermarkStrategy defined with the Function(with_idleness) report an error > -- > > Key: FLINK-32040 > URL: https://issues.apache.org/jira/browse/FLINK-32040 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Joekwal >Priority: Blocker > > *version:* upgrade pyflink1.15.2 to pyflink1.16.1 > > *Report an error:* > Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time > characteristic set to 'ProcessingTime', or did you forget to call > 'data_stream.assign_timestamps_and_watermarks(...)'? > The application before with version 1.15.2 has never reported the error. > > *Example:* > {code:java} > ```python``` > class MyTimestampAssigner(TimestampAssigner): >def extract_timestamp(self, value, record_timestamp: int) -> int: >return value['version'] > sql=""" > select columns,version(milliseconds) from kafka_source > """ > table = st_env.sql_query(sql) > stream = st_env.to_changelog_stream(table) > stream = stream.assign_timestamps_and_watermarks( > WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) > > .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) > stream = stream.key_by(CommonKeySelector()) \ > .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ > .process(WindowFunction(), typeInfo){code} > > Try to debug to trace > ??pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks?? > and find
[jira] [Updated] (FLINK-32040) The WatermarkStrategy defined with the Function(with_idleness) report an error
[ https://issues.apache.org/jira/browse/FLINK-32040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joekwal updated FLINK-32040: Description: *version:* upgrade pyflink1.15.2 to pyflink1.16.1 *Report an error:* Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'data_stream.assign_timestamps_and_watermarks(...)'? The application before with version 1.15.2 has never reported the error. *Example:* {code:java} ```python``` class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp: int) -> int: return value['version'] sql=""" select columns,version(milliseconds) from kafka_source """ table = st_env.sql_query(sql) stream = st_env.to_changelog_stream(table) stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) stream = stream.key_by(CommonKeySelector()) \ .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ .process(WindowFunction(), typeInfo){code} Try to debug to trace pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks and find watermark_strategy._timestamp_assigner is none. *Solution:* Remove function–with_idleness(Duration.of_seconds(10)) {code:java} stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner())) {code} Is this a bug? was: version: upgrade pyflink1.15.2 to pyflink1.16.1 Report an error: Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'data_stream.assign_timestamps_and_watermarks(...)'? The application before with version 1.15.2 has never reported the error. Example1 report an error: {code:java} ```python``` class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp: int) -> int: return value['version'] sql=""" select columns,version(milliseconds) from kafka_source """ table = st_env.sql_query(sql) stream = st_env.to_changelog_stream(table) stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) stream = stream.key_by(CommonKeySelector()) \ .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ .process(WindowFunction(), typeInfo){code} Try to debug to trace pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks and find watermark_strategy._timestamp_assigner is none. Solution: Remove function–with_idleness(Duration.of_seconds(10)) {code:java} stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner())) {code} Is this a bug? > The WatermarkStrategy defined with the Function(with_idleness) report an error > -- > > Key: FLINK-32040 > URL: https://issues.apache.org/jira/browse/FLINK-32040 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Joekwal >Priority: Blocker > > *version:* upgrade pyflink1.15.2 to pyflink1.16.1 > > *Report an error:* > Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time > characteristic set to 'ProcessingTime', or did you forget to call > 'data_stream.assign_timestamps_and_watermarks(...)'? > The application before with version 1.15.2 has never reported the error. > > *Example:* > {code:java} > ```python``` > class MyTimestampAssigner(TimestampAssigner): >def extract_timestamp(self, value, record_timestamp: int) -> int: >return value['version'] > sql=""" > select columns,version(milliseconds) from kafka_source > """ > table = st_env.sql_query(sql) > stream = st_env.to_changelog_stream(table) > stream = stream.assign_timestamps_and_watermarks( > WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) > > .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) > stream = stream.key_by(CommonKeySelector()) \ > .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ > .process(WindowFunction(), typeInfo){code} > > Try to debug to trace > pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks > and find watermark_strategy._timestamp_assigner
[jira] [Created] (FLINK-32040) The WatermarkStrategy defined with the Function(with_idleness) report an error
Joekwal created FLINK-32040: --- Summary: The WatermarkStrategy defined with the Function(with_idleness) report an error Key: FLINK-32040 URL: https://issues.apache.org/jira/browse/FLINK-32040 Project: Flink Issue Type: Bug Components: API / Python Reporter: Joekwal version: upgrade pyflink1.15.2 to pyflink1.16.1 Report an error: Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'data_stream.assign_timestamps_and_watermarks(...)'? The application before with version 1.15.2 has never reported the error. Example1 report an error: {code:java} ```python``` class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp: int) -> int: return value['version'] sql=""" select columns,version(milliseconds) from kafka_source """ table = st_env.sql_query(sql) stream = st_env.to_changelog_stream(table) stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10))) stream = stream.key_by(CommonKeySelector()) \ .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \ .process(WindowFunction(), typeInfo){code} Try to debug to trace pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks and find watermark_strategy._timestamp_assigner is none. Solution: Remove function–with_idleness(Duration.of_seconds(10)) {code:java} stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1)) .with_timestamp_assigner(MyTimestampAssigner())) {code} Is this a bug? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30977) flink tumbling window stream converting to pandas dataframe not work
[ https://issues.apache.org/jira/browse/FLINK-30977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joekwal closed FLINK-30977. --- Resolution: Fixed > flink tumbling window stream converting to pandas dataframe not work > > > Key: FLINK-30977 > URL: https://issues.apache.org/jira/browse/FLINK-30977 > Project: Flink > Issue Type: Bug > Components: API / Python > Environment: pyflink1.15.2 >Reporter: Joekwal >Priority: Major > > I want to know if tumbling window supported to convert to pandas? > {code:java} > code... #create env > kafka_src = """ > CREATE TABLE if not exists `kafka_src` ( > ... > `event_time` as CAST(`end_time` as TIMESTAMP(3)), > WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND > ) > with ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.bootstrap.servers' = '***', > 'properties.group.id' = '***', > 'scan.startup.mode' = 'earliest-offset', > 'value.format' = 'debezium-json' > ); > """ > > t_env.execute_sql(kafka_src) > table = st_env.sql_query("SELECT columns,`event_time` \ > FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' > MINUTES))") > table.execute().print() #could print the result > df = table.to_pandas() > #schema is correct! > schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), > ... > ]) > table = st_env.from_pandas(df,schema=schema) > st_env.create_temporary_view("view_table",table) > st_env.sql_query("select * from view_table").execute().print() # Not > work!Can't print the result {code} > Tumbling window stream from kafka source convert to pandas dataframe and it > can't print the result.The schema is right.I have tested in another job with > using batch stream from jdbc source.It can print the result.The only > different thing is the input stream.Is tumbling windows supported to convert > to Pandas? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30977) flink tumbling window stream converting to pandas dataframe not work
[ https://issues.apache.org/jira/browse/FLINK-30977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joekwal updated FLINK-30977: Description: I want to know if tumbling window supported to convert to pandas? {code:java} code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result {code} Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.Is tumbling windows supported to convert to Pandas? was: I want to know if tumbling window supported to convert to pandas? {code:java} code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result {code} Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.As doc mentioned, the bounded stream is supported to convert to pandas.So what could have gone wrong? > flink tumbling window stream converting to pandas dataframe not work > > > Key: FLINK-30977 > URL: https://issues.apache.org/jira/browse/FLINK-30977 > Project: Flink > Issue Type: Bug > Components: API / Python > Environment: pyflink1.15.2 >Reporter: Joekwal >Priority: Major > > I want to know if tumbling window supported to convert to pandas? > {code:java} > code... #create env > kafka_src = """ > CREATE TABLE if not exists `kafka_src` ( > ... > `event_time` as CAST(`end_time` as TIMESTAMP(3)), > WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND > ) > with ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.bootstrap.servers' = '***', > 'properties.group.id' = '***', > 'scan.startup.mode' = 'earliest-offset', > 'value.format' = 'debezium-json' > ); > """ > > t_env.execute_sql(kafka_src) > table = st_env.sql_query("SELECT columns,`event_time` \ > FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' > MINUTES))") > table.execute().print() #could print the result > df = table.to_pandas() > #schema is correct! > schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), > ... > ]) > table = st_env.from_pandas(df,schema=schema) > st_env.create_temporary_view("view_table",table) > st_env.sql_query("select * from view_table").execute().print() # Not > work!Can't print the result {code} > Tumbling window stream from kafka source convert to pandas dataframe and it > can't print the result.The schema is right.I have tested in another job with > using batch stream from jdbc source.It can print the result.The only > different thing is the input stream.Is tumbling windows supported to convert > to Pandas? -- This message was
[jira] [Updated] (FLINK-30977) flink tumbling window stream converting to pandas dataframe not work
[ https://issues.apache.org/jira/browse/FLINK-30977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joekwal updated FLINK-30977: Description: I want to know if tumbling window supported to convert to pandas? {code:java} code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result {code} Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.As doc mentioned, the bounded stream is supported to convert to pandas.So what could have gone wrong? was: I want to know if tumbling window supported to convert to pandas? {code:java} code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result {code} Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.As doc mentioned, the bounded stream is supported to convert to pandas.So what could have gone wrong? > flink tumbling window stream converting to pandas dataframe not work > > > Key: FLINK-30977 > URL: https://issues.apache.org/jira/browse/FLINK-30977 > Project: Flink > Issue Type: Bug > Environment: pyflink1.15.2 >Reporter: Joekwal >Priority: Major > > I want to know if tumbling window supported to convert to pandas? > {code:java} > code... #create env > kafka_src = """ > CREATE TABLE if not exists `kafka_src` ( > ... > `event_time` as CAST(`end_time` as TIMESTAMP(3)), > WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND > ) > with ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.bootstrap.servers' = '***', > 'properties.group.id' = '***', > 'scan.startup.mode' = 'earliest-offset', > 'value.format' = 'debezium-json' > ); > """ > > t_env.execute_sql(kafka_src) > table = st_env.sql_query("SELECT columns,`event_time` \ > FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' > MINUTES))") > table.execute().print() #could print the result > df = table.to_pandas() > #schema is correct! > schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), > ... > ]) > table = st_env.from_pandas(df,schema=schema) > st_env.create_temporary_view("view_table",table) > st_env.sql_query("select * from view_table").execute().print() # Not > work!Can't print the result {code} > Tumbling window stream from kafka source convert to pandas dataframe and it > can't print the result.The schema is right.I have tested in another job with > using batch stream from jdbc source.It can print the result.The only > different thing is the input stream.As doc mentioned, the bounded stream is > supported to convert to
[jira] [Updated] (FLINK-30977) flink tumbling window stream converting to pandas dataframe not work
[ https://issues.apache.org/jira/browse/FLINK-30977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joekwal updated FLINK-30977: Description: I want to know if tumbling window supported to convert to pandas? {code:java} code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result {code} Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.As doc mentioned, the bounded stream is supported to convert to pandas.So what could have gone wrong? was: I want to know if tumbling window supported to convert to pandas? ``` code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result ``` Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.As doc mentioned, the bounded stream is supported to convert to pandas.So what could have gone wrong? > flink tumbling window stream converting to pandas dataframe not work > > > Key: FLINK-30977 > URL: https://issues.apache.org/jira/browse/FLINK-30977 > Project: Flink > Issue Type: Bug > Environment: pyflink1.15.2 >Reporter: Joekwal >Priority: Major > > I want to know if tumbling window supported to convert to pandas? > {code:java} > code... #create env > kafka_src = """ > CREATE TABLE if not exists `kafka_src` ( > ... > `event_time` as CAST(`end_time` as TIMESTAMP(3)), > WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND > ) > with ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.bootstrap.servers' = '***', > 'properties.group.id' = '***', > 'scan.startup.mode' = 'earliest-offset', > 'value.format' = 'debezium-json' > ); > """ > t_env.execute_sql(kafka_src) > table = st_env.sql_query("SELECT columns,`event_time` \ > FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' > MINUTES))") > table.execute().print() #could print the result > df = table.to_pandas() > #schema is correct! > schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), > ... > ]) > table = st_env.from_pandas(df,schema=schema) > st_env.create_temporary_view("view_table",table) > st_env.sql_query("select * from view_table").execute().print() # Not > work!Can't print the result {code} > Tumbling window stream from kafka source convert to pandas dataframe and it > can't print the result.The schema is right.I have tested in another job with > using batch stream from jdbc source.It can print the result.The only > different thing is the input stream.As doc mentioned, the bounded stream is > supported to convert to pandas.So
[jira] [Updated] (FLINK-30977) flink tumbling window stream converting to pandas dataframe not work
[ https://issues.apache.org/jira/browse/FLINK-30977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joekwal updated FLINK-30977: Description: I want to know if tumbling window supported to convert to pandas? ``` code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result ``` Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.As doc mentioned, the bounded stream is supported to convert to pandas.So what could have gone wrong? was: I want to know if tumbling window supported to convert to pandas? ``` code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") # table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result ``` Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.As doc mentioned, the bounded stream is supported to convert to pandas.So what could have gone wrong? > flink tumbling window stream converting to pandas dataframe not work > > > Key: FLINK-30977 > URL: https://issues.apache.org/jira/browse/FLINK-30977 > Project: Flink > Issue Type: Bug > Environment: pyflink1.15.2 >Reporter: Joekwal >Priority: Major > > I want to know if tumbling window supported to convert to pandas? > ``` > code... #create env > kafka_src = """ > CREATE TABLE if not exists `kafka_src` ( > ... > `event_time` as CAST(`end_time` as TIMESTAMP(3)), > WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND > ) > with ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.bootstrap.servers' = '***', > 'properties.group.id' = '***', > 'scan.startup.mode' = 'earliest-offset', > 'value.format' = 'debezium-json' > ); > """ > t_env.execute_sql(kafka_src) > table = st_env.sql_query("SELECT columns,`event_time` \ > FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' > MINUTES))") > table.execute().print() #could print the result > df = table.to_pandas() > #schema is correct! > schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), > ... > ]) > table = st_env.from_pandas(df,schema=schema) > st_env.create_temporary_view("view_table",table) > st_env.sql_query("select * from view_table").execute().print() # Not > work!Can't print the result > ``` > Tumbling window stream from kafka source convert to pandas dataframe and it > can't print the result.The schema is right.I have tested in another job with > using batch stream from jdbc source.It can print the result.The only > different thing is the input stream.As doc mentioned, the bounded stream is > supported to convert to pandas.So what could
[jira] [Created] (FLINK-30977) flink tumbling window stream converting to pandas dataframe not work
Joekwal created FLINK-30977: --- Summary: flink tumbling window stream converting to pandas dataframe not work Key: FLINK-30977 URL: https://issues.apache.org/jira/browse/FLINK-30977 Project: Flink Issue Type: Bug Environment: pyflink1.15.2 Reporter: Joekwal I want to know if tumbling window supported to convert to pandas? ``` code... #create env kafka_src = """ CREATE TABLE if not exists `kafka_src` ( ... `event_time` as CAST(`end_time` as TIMESTAMP(3)), WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND ) with ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """ t_env.execute_sql(kafka_src) table = st_env.sql_query("SELECT columns,`event_time` \ FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' MINUTES))") # table.execute().print() #could print the result df = table.to_pandas() #schema is correct! schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()), ... ]) table = st_env.from_pandas(df,schema=schema) st_env.create_temporary_view("view_table",table) st_env.sql_query("select * from view_table").execute().print() # Not work!Can't print the result ``` Tumbling window stream from kafka source convert to pandas dataframe and it can't print the result.The schema is right.I have tested in another job with using batch stream from jdbc source.It can print the result.The only different thing is the input stream.As doc mentioned, the bounded stream is supported to convert to pandas.So what could have gone wrong? -- This message was sent by Atlassian Jira (v8.20.10#820010)