????????????????????source????????????????????sink1????????????1??2????????????add_insert_sql??????????????source??????????????sink2????????????????1??2?? ??????????????1??2??1??2
------------------ ???????? ------------------ ??????: "Dian Fu" <dian0511...@gmail.com>; ????????: 2021??3??12??(??????) ????10:24 ??????: "user-zh"<user-zh@flink.apache.org>;"??????"<wushang...@foxmail.com>; ????: Re: ????statement???????????? ????????????????????????????????????1??2??1??2???? ??????????????????????????????????????????1????????????????sink1??????????sink2????????????2??1????????????????????????????2??2 On Mon, Mar 8, 2021 at 9:42 AM ?????? <wushang...@foxmail.com> wrote: from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) table_env.create_temporary_view("simple_source", table) table_env.execute_sql(""" CREATE TABLE first_sink_table ( id BIGINT, data VARCHAR ) WITH ( 'connector' = 'print' ) """) table_env.execute_sql(""" CREATE TABLE second_sink_table ( id BIGINT, data VARCHAR ) WITH ( 'connector' = 'print' ) """) # ????????statement???? statement_set = table_env.create_statement_set() # ????TABLE API ??table??????first_sink_table?????? statement_set.add_insert("first_sink_table", table) # ????SQL??table??????second_sink_table?????? statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source") # ???????? statement_set.execute().wait()????????????????????????????????????????????????????????????1??2??1??2????????1??1??2??2??4&gt; +I(1,Hi) 4&gt; +I(1,Hi) 4&gt; +I(2,Hello) 4&gt; +I(2,Hello)