有新数据进来吗,看起来和这个jira很像 https://issues.apache.org/jira/browse/FLINK-15262
在2020年07月13日 20:38,Leonard Xu<xbjt...@gmail.com> 写道: Hi, 简单看了下代码应该没啥问题,alarm_test_g 这个kafka topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点 Best, Leonard Xu 在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道: 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python *.py执行的。完整代码如下 from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings source=""" CREATE TABLE kafka_source_tab ( id VARCHAR, alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'alarm_test_g', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '10.2.2.73:2181', 'properties.bootstrap.servers' = '10.2.2.73:9092', 'format' = 'json' ) """ sink=""" CREATE TABLE g_source_tab ( id VARCHAR, alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 'table-name' = 'g', 'username' = 'root', 'password' = '123456t', 'sink.buffer-flush.interval' = '1s' ) """ env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_parallelism(1) env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) t_env.execute_sql(source) t_env.execute_sql(sink) source = t_env.from_path("kafka_source_tab")\ .select("id,alarm_id,trck_id") source.execute_insert("g_source_tab")