看下你打包的 UberJar 里有没一个内容包括
1、下面这个文件是存在的
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
的文件
META-INF/services/org.apache.flink.table.factories.TableFactory
2、flink版本1.10,Standalone模式启动服务(start-cluster.sh),flink
run运行(/software/flink-1.10.0/bin/flink run -c com.data.main.StreamMain
./flink_1.10_test-1.0-jar-with-dependencies.jar)
3、再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()



On Wed, Apr 22, 2020 at 6:00 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi,
>
> 先确认下你的Jar包里有没有 meta-inf-services的文件?里面确定有Kafka?
>
> 如果有,再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
> 因为现在默认是通过ThreadClassLoader来获取Factory的。
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 22, 2020 at 5:30 PM 宇张 <zhan...@akulaku.com> wrote:
>
> > 我这面使用Standalone模式运行Flink任务,但是Uber
> > Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
> > child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber
> Jar里面的Factory不能被加载
> > Flink Client respects Classloading Policy (FLINK-13749
> > <https://issues.apache.org/jira/browse/FLINK-13749>)
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
> > >
> >
> > The Flink client now also respects the configured classloading policy,
> > i.e., parent-first or child-first classloading. Previously, only cluster
> > components such as the job manager or task manager supported this
> setting.
> > This does mean that users might get different behaviour in their
> programs,
> > in which case they should configure the classloading policy explicitly to
> > use parent-first classloading, which was the previous (hard-coded)
> > behaviour.
> >
> > 异常信息:
> >
> >   rg.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: findAndCreateTableSource failed.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> > at
> >
> >
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> > Caused by: org.apache.flink.table.api.TableException:
> > findAndCreateTableSource failed.
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
> > at
> >
> >
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
> > at
> >
> >
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
> > at
> >
> >
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
> > at
> >
> >
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> > at
> >
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> > at
> >
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> > at
> >
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> > at
> >
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> > at
> >
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
> > at com.akulaku.data.main.StreamMain.main(StreamMain.java:87)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> > ... 8 more
> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> > Could not find a suitable table factory for
> > 'org.apache.flink.table.factories.TableSourceFactory' in
> > the classpath.
> >
> > Reason: Required context properties mismatch.
> >
> > The matching candidates:
> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> > Mismatched properties:
> > 'connector.type' expects 'filesystem', but is 'kafka'
> > 'format.type' expects 'csv', but is 'json'
> >
> > The following properties are requested:
> > connector.properties.bootstrap.servers=centos:9092
> > connector.properties.zookeeper.connect=centos:2181
> > connector.startup-mode=earliest-offset
> > connector.topic=test
> > connector.type=kafka
> > connector.version=0.11
> > format.type=json
> > schema.0.data-type=VARCHAR(2147483647)
> > schema.0.name=bus
> > schema.1.data-type=BIGINT
> > schema.1.name=ts
> > schema.2.data-type=VARCHAR(2147483647)
> > schema.2.name=type
> > schema.3.data-type=BIGINT
> > schema.3.name=putRowNum
> > schema.4.data-type=TIMESTAMP(3) NOT NULL
> > schema.4.expr=PROCTIME()
> > schema.4.name=proctime
> > update-mode=append
> >
> > The following factories have been considered:
> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> > at
> >
> >
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
> >
>
>
> --
> Best, Jingsong Lee
>

回复