代码如下:
// tEnv;
tEnv.sqlUpdate("create table dr1(  " +
        "  cid STRING,  " +
        "  server_time BIGINT,  " +
        "  d MAP<STRING, STRING>,  " +
        "  process_time AS PROCTIME(),  " +
        "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),  " +
        "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND  " +
        ") WITH (  " +
        "  'update-mode' = 'append',  " +
        "  'connector.type' = 'kafka',  " +
        "  'connector.version' = 'universal',  " +
        "  'connector.topic' = 'antibot_dr1',  " +
        "  'connector.startup-mode' = 'latest-offset',  " +
        "  'connector.properties.zookeeper.connect' =
'yq01-sw-xxx03.yq01:8681',  " +
        "  'connector.properties.bootstrap.servers' =
'yq01-sw-xxx03.yq01:8192',  " +
        "  'format.type' = 'json'  " +
        ")");
Table t1 = tEnv.sqlQuery("select * from dr1");

我打包会把flink-json打包进去,最终结果包是test.jar。

test.jar是个fat jar,相关依赖都有了。

然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

可是我flink-json.jar都打包进去了,居然还是报错。。。

解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器

上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。

搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?

回复