Dian Fu created FLINK-24003: ------------------------------- Summary: Lookback mode doesn't work when mixing use of Python Table API and Python DataStream API Key: FLINK-24003 URL: https://issues.apache.org/jira/browse/FLINK-24003 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0 Reporter: Dian Fu Assignee: Huang Xingbo Fix For: 1.14.0
For the following program: {code} import logging import time from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction from pyflink.table import StreamTableEnvironment, DataTypes, Schema def test_chaining(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.get_config().get_configuration().set_boolean("python.operator-chaining.enabled", False) # 1. create source Table t_env.execute_sql(""" CREATE TABLE datagen ( id INT, data STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000000', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '1000' ) """) # 2. create sink Table t_env.execute_sql(""" CREATE TABLE print ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) t_env.execute_sql(""" CREATE TABLE print_2 ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) # 3. query from source table and perform calculations # create a Table from a Table API query: source_table = t_env.from_path("datagen") ds = t_env.to_append_stream( source_table, Types.ROW([Types.INT(), Types.STRING()])) ds1 = ds.map(lambda i: (i[0] * i[0], i[1])) ds2 = ds.map(lambda i: (i[0], i[1][2:])) class MyCoMapFunction(CoMapFunction): def map1(self, value): print('hahah') return value def map2(self, value): print('hahah') return value ds3 = ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.STRING()])) ds4 = ds3.map(lambda i: (i[0], i[1], "left"), output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\ .map(lambda i: i, output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) schema = Schema.new_builder() \ .column("f0", DataTypes.BIGINT()) \ .column("f1", DataTypes.STRING()) \ .column("f2", DataTypes.STRING()) \ .build() result_table_3 = t_env.from_data_stream(ds4, schema) statement_set = t_env.create_statement_set() statement_set.add_insert("print", result_table_3) result_table_4 = t_env.from_data_stream(ds5, schema) statement_set.add_insert("print_2", result_table_4) statement_set.execute().wait() if __name__ == "__main__": start_ts = time.time() test_chaining() end_ts = time.time() print("--- %s seconds ---" % (end_ts - start_ts)) {code} Lookback mode doesn't work. -- This message was sent by Atlassian Jira (v8.3.4#803005)