[ https://issues.apache.org/jira/browse/FLINK-20128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231243#comment-17231243 ]
Shengkai Fang commented on FLINK-20128: --------------------------------------- I don't get the expected bug by sql-client and java table api. I think it maybe a pyflink's bug? > Data loss for over windows with rows unbounded preceding > -------------------------------------------------------- > > Key: FLINK-20128 > URL: https://issues.apache.org/jira/browse/FLINK-20128 > Project: Flink > Issue Type: Bug > Components: API / Python, Table SQL / Planner > Affects Versions: 1.12.0, 1.11.2 > Reporter: Thilo Schneider > Priority: Major > > When using partitioned, unbounded over windows, all but one partitions are > dropped in the output dataset: > {code:python} > # Setup > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > from biafflink import debug_print_table > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > t_env = StreamTableEnvironment.create( environment_settings=env_settings) > t_env.get_config().get_configuration().set_integer("table.exec.resource.default-parallelism", > 1) > t_env.execute_sql(""" > CREATE TABLE datagen ( > foo INT, > id AS mod(foo, 2), > message_time AS to_timestamp(from_unixtime(FLOOR(foo/2))), > WATERMARK FOR message_time AS message_time > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='2', > 'fields.foo.kind'='sequence', > 'fields.foo.start'='0', > 'fields.foo.end'='19' > )""") > t_env.execute_sql("CREATE TABLE output (foo INT, id INT, lagfoo INT) WITH > ('connector' = 'print')") > {code} > Using bounded over windows, everything works as expected: > {code:python} > t = t_env.sql_query(""" > SELECT foo, id, avg(foo) OVER w AS lagfoo > FROM datagen > WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN 1 > PRECEDING AND CURRENT ROW)""") > t.execute_insert("output") > {code} > yields > {code:python} > +I(0,0,0) > +I(1,1,1) > +I(2,0,1) > +I(3,1,2) > +I(4,0,3) > ... > {code} > If we change the window to unbounded preceding: > {code:python} > t = t_env.sql_query(""" > SELECT foo, id, avg(foo) OVER w AS lagfoo > FROM datagen > WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW)""") > t.execute_insert("output") > {code} > we loose all of id == 1: > {code:python} > +I(0,0,0) > +I(2,0,1) > +I(4,0,2) > +I(6,0,3) > +I(8,0,4) > ... > {code} > I observed this problem with various aggregate functions and both under > 1.11.2 and 1.12rc1. -- This message was sent by Atlassian Jira (v8.3.4#803005)