????????????????????????flink??Kafka??????????mysql????mysql????????????????????????????????????????on DUPLICATE??????????????????????????????????????????????????????????????linux????????python *.py????????????????????????????????????????????
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings source=""" CREATE TABLE kafka_source_tab ( trck_id VARCHAR, score INT ) WITH ( 'connector' = 'kafka', 'topic' = 'alarm_test_g', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '10.2.2.73:2181', 'properties.bootstrap.servers' = '10.2.2.73:9092', 'format' = 'json' ) """ sink=""" CREATE TABLE g_source_tab ( trck_id VARCHAR, score INT, PRIMARY KEY (trck_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 'table-name' = 'g', 'username' = 'root', 'password' = '123456t', 'sink.buffer-flush.interval' = '1s' ) """ env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_parallelism(1) env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) t_env.execute_sql(source) t_env.execute_sql(sink) table_result1=t_env.execute_sql('''Insert into g_source_tab (`trck_id`,`score`) VALUES (select trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE score=score+1''') table_result1.get_job_client().get_job_execution_result().result()