Dian Fu created FLINK-24003:
-------------------------------

             Summary: Lookback mode doesn't work when mixing use of Python 
Table API and Python DataStream API
                 Key: FLINK-24003
                 URL: https://issues.apache.org/jira/browse/FLINK-24003
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.14.0
            Reporter: Dian Fu
            Assignee: Huang Xingbo
             Fix For: 1.14.0


For the following program:
{code}
import logging
import time

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
from pyflink.table import StreamTableEnvironment, DataTypes, Schema


def test_chaining():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    
t_env.get_config().get_configuration().set_boolean("python.operator-chaining.enabled",
 False)

    # 1. create source Table
    t_env.execute_sql("""
        CREATE TABLE datagen (
            id INT,
            data STRING
        ) WITH (
            'connector' = 'datagen',
            'rows-per-second' = '1000000',
            'fields.id.kind' = 'sequence',
            'fields.id.start' = '1',
            'fields.id.end' = '1000'
        )
    """)

    # 2. create sink Table
    t_env.execute_sql("""
        CREATE TABLE print (
            id BIGINT,
            data STRING,
            flag STRING
        ) WITH (
            'connector' = 'blackhole'
        )
    """)

    t_env.execute_sql("""
        CREATE TABLE print_2 (
            id BIGINT,
            data STRING,
            flag STRING
        ) WITH (
            'connector' = 'blackhole'
        )
    """)

    # 3. query from source table and perform calculations
    # create a Table from a Table API query:
    source_table = t_env.from_path("datagen")

    ds = t_env.to_append_stream(
        source_table,
        Types.ROW([Types.INT(), Types.STRING()]))

    ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
    ds2 = ds.map(lambda i: (i[0], i[1][2:]))

    class MyCoMapFunction(CoMapFunction):

        def map1(self, value):
            print('hahah')
            return value

        def map2(self, value):
            print('hahah')
            return value

    ds3 = ds1.connect(ds2).map(MyCoMapFunction(), 
output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))

    ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
                  output_type=Types.TUPLE([Types.LONG(), Types.STRING(), 
Types.STRING()]))

    ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
             .map(lambda i: i,
                  output_type=Types.TUPLE([Types.LONG(), Types.STRING(), 
Types.STRING()]))

    schema = Schema.new_builder() \
        .column("f0", DataTypes.BIGINT()) \
        .column("f1", DataTypes.STRING()) \
        .column("f2", DataTypes.STRING()) \
        .build()

    result_table_3 = t_env.from_data_stream(ds4, schema)
    statement_set = t_env.create_statement_set()
    statement_set.add_insert("print", result_table_3)

    result_table_4 = t_env.from_data_stream(ds5, schema)
    statement_set.add_insert("print_2", result_table_4)

    statement_set.execute().wait()


if __name__ == "__main__":

    start_ts = time.time()
    test_chaining()
    end_ts = time.time()
    print("--- %s seconds ---" % (end_ts - start_ts))
{code}

Lookback mode doesn't work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to