????????????????????source????????????????????sink1????????????1??2????????????add_insert_sql??????????????source??????????????sink2????????????????1??2??
??????????????1??2??1??2



------------------ ???????? ------------------
??????:                                                                         
                                               "Dian Fu"                        
                                                            
<dian0511...@gmail.com&gt;;
????????:&nbsp;2021??3??12??(??????) ????10:24
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"??????"<wushang...@foxmail.com&gt;;

????:&nbsp;Re: ????statement????????????



????????????????????????????????????1??2??1??2????

??????????????????????????????????????????1????????????????sink1??????????sink2????????????2??1????????????????????????????2??2

On Mon, Mar 8, 2021 at 9:42 AM ?????? <wushang...@foxmail.com&gt; wrote:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
 
 env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
 table_env = StreamTableEnvironment.create(environment_settings=env_settings)
 
 table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
 table_env.create_temporary_view("simple_source", table)
 table_env.execute_sql("""
 &nbsp; &nbsp; CREATE TABLE first_sink_table (
 &nbsp; &nbsp; &nbsp; &nbsp; id BIGINT, 
 &nbsp; &nbsp; &nbsp; &nbsp; data VARCHAR 
 &nbsp; &nbsp; ) WITH (
 &nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print'
 &nbsp; &nbsp; )
 """)
 table_env.execute_sql("""
 &nbsp; &nbsp; CREATE TABLE second_sink_table (
 &nbsp; &nbsp; &nbsp; &nbsp; id BIGINT, 
 &nbsp; &nbsp; &nbsp; &nbsp; data VARCHAR
 &nbsp; &nbsp; ) WITH (
 &nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print'
 &nbsp; &nbsp; )
 """)
 # ????????statement????
 statement_set = table_env.create_statement_set()
 # ????TABLE API ??table??????first_sink_table??????
 statement_set.add_insert("first_sink_table", table)
 # ????SQL??table??????second_sink_table??????
 statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM 
simple_source")
 # ????????
 
statement_set.execute().wait()????????????????????????????????????????????????????????????1??2??1??2????????1??1??2??2??4&amp;gt;
 +I(1,Hi)
 4&amp;gt; +I(1,Hi)
 4&amp;gt; +I(2,Hello)
 4&amp;gt; +I(2,Hello)

回复