有新数据进来吗,看起来和这个jira很像
https://issues.apache.org/jira/browse/FLINK-15262




在2020年07月13日 20:38,Leonard Xu<xbjt...@gmail.com> 写道:
Hi,

简单看了下代码应该没啥问题,alarm_test_g 这个kafka 
topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

Best,
Leonard Xu

在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道:

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python 
*.py执行的。完整代码如下


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
&nbsp;id VARCHAR,&nbsp; &nbsp;
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp;
&nbsp;trck_id VARCHAR


) WITH (
&nbsp;'connector' = 'kafka',
&nbsp;'topic' = 'alarm_test_g',&nbsp; &nbsp;
&nbsp;'scan.startup.mode' = 'earliest-offset',
&nbsp;'properties.bootstrap.servers' = '10.2.2.73:2181',
&nbsp;'properties.bootstrap.servers' = '10.2.2.73:9092',
&nbsp;'format' = 'json'&nbsp;
)
"""

sink="""
CREATE TABLE g_source_tab (
&nbsp;id VARCHAR,&nbsp; &nbsp;
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp; &nbsp;
&nbsp;trck_id VARCHAR


) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',&nbsp;
&nbsp;'table-name' = 'g',&nbsp; &nbsp;
&nbsp;'username' = 'root',
&nbsp;'password' = '123456t',
&nbsp;'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
&nbsp; &nbsp; &nbsp; &nbsp; .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")

回复