????????????????????mysql??????????MySQL????????????linux??????????????????????
py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (4d6b1273229e0e16fa433c652b5cb74d)





??????????


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
batch_source_ddl = """
CREATE TABLE mh_source_tab (
 lid INT,
 dir INT,
 posid BIGINT,
 km BIGINT,
 poleId BIGINT,
 ts BIGINT,
 rt Decimal(6,2),
 time1 VARCHAR 
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://**',   
 'table-name' = 'nj_mh_test',   --??????????
 'driver' = 'com.mysql.cj.jdbc.Driver',
 'username' = 'root',
 'password' = 'root',
 --'sink.buffer-flush.max-rows' = '100',
 'sink.buffer-flush.interval' = '1s'


)


warn_alarm_mh_ddl = """
CREATE TABLE warn_alarm_mh_sink (
 lid INT,
 dir INT,
 posid BIGINT,
 km BIGINT,
 poleId BIGINT,
 extremum Decimal(6,2),
 PRIMARY KEY (lid,dir,posid,poleId) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://**',
 'table-name' = 'warn_mh_alarm_result',   --??????????
 'driver' = 'com.mysql.cj.jdbc.Driver',
 'username' = 'root',
 'password' = 'root',
 --'sink.buffer-flush.max-rows' = '100',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(batch_source_ddl )
t_env.execute_sql(warn_alarm_mh_ddl)


def threshold_alarm(delta_thres):
    source = t_env.from_path("mh_source_tab") \
&nbsp; &nbsp; &nbsp; &nbsp; .where("rt <&gt; -10000")\
&nbsp; &nbsp; &nbsp; &nbsp; .group_by("lid, dir, posid, km, poleId")\
&nbsp; &nbsp; &nbsp; &nbsp; .select("lid, dir, posid, km, poleId, max(rt) as 
max_rt, min(rt) as min_rt, max(rt)-min(rt) as extremum")\
&nbsp; &nbsp; &nbsp; &nbsp; .where("extremum &gt;"+str(delta_thres))\
&nbsp; &nbsp; &nbsp; &nbsp; .select("lid, dir, posid, km, poleId, extremum")
&nbsp; &nbsp; source.execute_insert("warn_alarm_mh_sink") \
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.get_job_client() \
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.get_job_execution_result() \
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.result()
if __name__ == '__main__':
&nbsp; &nbsp; threshold_alarm(delta_thres=0.5)

回复