Hello??
    
????????????????????????flink1.11.0??????????????????????????????????kafka??????????????????????????????????????????????
    
????????????????????????????mysql????????????????????sink.buffer-flush.max-rows????????1000????
    ??????????????????mysql??????????????1000????????????????????????????????
    ????????????????????????mysql



chengyanan1...@foxmail.com
 
???????? ??????
?????????? 2020-07-24 18:18
???????? user-zh
?????? flink1.11??????????????????mysql????????
????????????????????????????????flink1.11????kafka????????????????????mysql????????????????kafka????????????300??/??????????????????????????mysql??????????6??????????????????????????????????????????????????????????????
 
 
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source_Kafka = """
CREATE TABLE kafka_source (
 id VARCHAR, 
 alarm_id VARCHAR,
 trck_id VARCHAR
) WITH (
 'connector' = 'kafka',
 'topic' = 'test',  
 'properties.bootstrap.servers' = '*',
 'properties.group.id' = 'flink_grouper',
 'scan.startup.mode' = 'earliest-offset',    
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)
"""
source_W_detail_ddl = """
CREATE TABLE source_W_detail (
 id VARCHAR,    
 alarm_id VARCHAR,     
 trck_id VARCHAR    
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',
 'driver' = 'com.mysql.cj.jdbc.Driver',
 'table-name' = 'detail',
 'username' = 'root',
 'password' = 'root',
 'sink.buffer-flush.max-rows' = '1000',
 'sink.buffer-flush.interval' = '2s'
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source_Kafka)
t_env.execute_sql(source_W_detail_ddl)
table_result1=t_env.execute_sql('''insert into source_W_detail select 
id,alarm_id,trck_id from kafka_source''')
table_result1.get_job_client().get_job_execution_result().result()

回复