> 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader
有问题。之前FileSystem 相关解析就出过类似的 ClassLoader 的 BUG

@tison 不管怎么样,也得保证jar里的SPI文件包含Kafka的类,不然SPI没法找

@宇张 建议你仔细看下[1],这个pom是能打出正确的SPI文件的

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104

Best,
Jingsong Lee

On Thu, Apr 23, 2020 at 3:35 PM tison <wander4...@gmail.com> wrote:

> 另外你 shaded 里面去 shaded com.ibm.icu 也意义不明...
>
> Best,
> tison.
>
>
> tison <wander4...@gmail.com> 于2020年4月23日周四 下午3:34写道:
>
> > 这个问题我建议你记一个 JIRA 然后提供一个可复现的程序。因为你如果是 Flink Standalone Session 模式,在 Client
> > 端编译失败抛出如上异常,不应该跟放不放在 lib 下有什么关系。这边听你说感觉也很奇怪,可能需要本地复现一下比较好判断。
> >
> > Best,
> > tison.
> >
> >
> > 宇张 <zhan...@akulaku.com> 于2020年4月23日周四 上午11:53写道:
> >
> >> 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> >> KafkaTableSourceSinkFactory
> >> 吗?(同时 class loading 为 child-first)
> >> 》》是的
> >>
> >> On Thu, Apr 23, 2020 at 11:42 AM tison <wander4...@gmail.com> wrote:
> >>
> >> > >》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> >> > >这个能拿到
> >> >
> >> > 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> >> > KafkaTableSourceSinkFactory
> >> > 吗?(同时 class loading 为 child-first)
> >> >
> >> > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader
> 有问题。之前
> >> > FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
> >> >
> >> > Best,
> >> > tison.
> >> >
> >> >
> >> > 宇张 <zhan...@akulaku.com> 于2020年4月23日周四 上午11:36写道:
> >> >
> >> > > 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
> >> > >
> >> > > <plugin>
> >> > >     <groupId>org.apache.maven.plugins</groupId>
> >> > >     <artifactId>maven-shade-plugin</artifactId>
> >> > >     <!--<version>3.2.3</version>-->
> >> > >     <executions>
> >> > >         <execution>
> >> > >             <phase>package</phase>
> >> > >             <goals>
> >> > >                 <goal>shade</goal>
> >> > >             </goals>
> >> > >             <configuration>
> >> > >                 <transformers>
> >> > >                     <transformer
> >> > >
> >> > >
> >> > >
> >> >
> >>
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> >> > >
> >> > > <mainClass>com.akulaku.data.main.StreamMain</mainClass>
> >> > >                     </transformer>
> >> > >                 </transformers>
> >> > >
> >> > >                 <filters>
> >> > >                     <filter>
> >> > >                         <artifact>*:*</artifact>
> >> > >                         <excludes>
> >> > >                             <exclude>META-INF/*.SF</exclude>
> >> > >                             <exclude>META-INF/*.DSA</exclude>
> >> > >                             <exclude>META-INF/*.RSA</exclude>
> >> > >                         </excludes>
> >> > >                     </filter>
> >> > >                 </filters>
> >> > >
> >> > >                 <artifactSet>
> >> > >                     <includes combine.children="append">
> >> > >                         <!--Sql parser is included in planners-->
> >> > >
> >> > > <include>org.apache.flink:flink-table-common</include>
> >> > >
> >> > > <include>org.apache.flink:flink-table-api-java</include>
> >> > >
> >> > > <include>org.apache.flink:flink-table-api-java-bridge_2.11</include>
> >> > >
> >> > > <include>org.apache.flink:flink-table-planner-blink_2.11</include>
> >> > >
> >> > > <include>org.apache.flink:flink-connector-kafka-0.11_2.11</include>
> >> > >
> >> > > <include>org.apache.flink:flink-connector-kafka-base_2.11</include>
> >> > >
>  <include>org.apache.flink:flink-json</include>
> >> > >                     </includes>
> >> > >                 </artifactSet>
> >> > >                 <relocations>
> >> > >                     <relocation>
> >> > >                         <!-- icu4j's dependencies -->
> >> > >                         <pattern>com.ibm.icu</pattern>
> >> > >
> >> > >
> >> <shadedPattern>org.apache.flink.table.shaded.com.ibm.icu</shadedPattern>
> >> > >                     </relocation>
> >> > >                 </relocations>
> >> > >             </configuration>
> >> > >         </execution>
> >> > >     </executions>
> >> > > </plugin>
> >> > >
> >> > >
> >> > > On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li <
> jingsongl...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > Flink的connector发现机制是通过java
> >> > spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
> >> > > >
> >> > > > > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
> >> > > >
> >> > > > 只是类文件是没有用的,没地方引用到它。
> >> > > >
> >> > > > 你试试[1]中的方法?添加combine.children
> >> > > >
> >> > > > [1]
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
> >> > > >
> >> > > > Best,
> >> > > > Jingsong Lee
> >> > > >
> >> > > > On Thu, Apr 23, 2020 at 10:37 AM 宇张 <zhan...@akulaku.com> wrote:
> >> > > >
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> META-INF/services/org.apache.flink.table.factories.TableFactory文件及里面的内容一致,而且两种打包方式运行时是都能加载到KafkaFactory类文件的,所以貌似不是打包导致的问题,而更像是bug
> >> > > > > 下面是我maven插件配置:
> >> > > > >
> >> > > > > <!--<plugin>-->
> >> > > > > <!--<groupId>org.apache.maven.plugins</groupId>-->
> >> > > > > <!--<artifactId>maven-assembly-plugin</artifactId>-->
> >> > > > > <!--<version>3.0.0</version>-->
> >> > > > > <!--<configuration>-->
> >> > > > > <!--<descriptorRefs>-->
> >> > > > > <!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
> >> > > > > <!--</descriptorRefs>-->
> >> > > > > <!--<archive>-->
> >> > > > > <!--<manifest>-->
> >> > > > > <!--<mainClass>com.akulaku.data.main.StreamMain</mainClass>-->
> >> > > > > <!--</manifest>-->
> >> > > > > <!--</archive>-->
> >> > > > > <!--</configuration>-->
> >> > > > > <!--<executions>-->
> >> > > > > <!--<execution>-->
> >> > > > > <!--<id>assemble-all</id>-->
> >> > > > > <!--<phase>package</phase>-->
> >> > > > > <!--<goals>-->
> >> > > > > <!--<goal>single</goal>-->
> >> > > > > <!--</goals>-->
> >> > > > > <!--</execution>-->
> >> > > > > <!--</executions>-->
> >> > > > > <!--</plugin>-->
> >> > > > > <!--<plugin>-->
> >> > > > > <!--<groupId>org.apache.maven.plugins</groupId>-->
> >> > > > > <!--<artifactId>maven-compiler-plugin</artifactId>-->
> >> > > > > <!--<configuration>-->
> >> > > > > <!--<source>8</source>-->
> >> > > > > <!--<target>8</target>-->
> >> > > > > <!--</configuration>-->
> >> > > > > <!--</plugin>-->
> >> > > > > <plugin>
> >> > > > >     <groupId>org.apache.maven.plugins</groupId>
> >> > > > >     <artifactId>maven-shade-plugin</artifactId>
> >> > > > >     <!--<version>3.2.3</version>-->
> >> > > > >     <executions>
> >> > > > >         <execution>
> >> > > > >             <phase>package</phase>
> >> > > > >             <goals>
> >> > > > >                 <goal>shade</goal>
> >> > > > >             </goals>
> >> > > > >             <configuration>
> >> > > > >                 <transformers>
> >> > > > >                     <transformer
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> >> > > > >
> >> > > > > <mainClass>com.akulaku.data.main.StreamMain</mainClass>
> >> > > > >                     </transformer>
> >> > > > >                 </transformers>
> >> > > > >
> >> > > > >                     <filters>
> >> > > > >                         <filter>
> >> > > > >                             <artifact>*:*</artifact>
> >> > > > >                             <excludes>
> >> > > > >                                 <exclude>META-INF/*.SF</exclude>
> >> > > > >
>  <exclude>META-INF/*.DSA</exclude>
> >> > > > >
>  <exclude>META-INF/*.RSA</exclude>
> >> > > > >                             </excludes>
> >> > > > >                         </filter>
> >> > > > >                     </filters>
> >> > > > >             </configuration>
> >> > > > >         </execution>
> >> > > > >     </executions>
> >> > > > > </plugin>
> >> > > > >
> >> > > > >
> >> > > > > On Wed, Apr 22, 2020 at 8:00 PM Jingsong Li <
> >> jingsongl...@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi,
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 如果org.apache.flink.table.factories.TableFactory里面没有KafkaTableSourceSinkFactory,那就是打包有问题。不清楚1.9的是怎么运行起来的,但是所有的jar的meta-inf-services文件都没有KafkaTableSourceSinkFactory,那也不应该能运行起来的。
> >> > > > > >
> >> > > > > > 推荐打包方式用shade,shade会merge meta-inf-services的文件的。
> >> > > > > >
> >> > > > > > Best,
> >> > > > > > Jingsong Lee
> >> > > > > >
> >> > > > > > On Wed, Apr 22, 2020 at 7:31 PM 宇张 <zhan...@akulaku.com>
> wrote:
> >> > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 这个我看了一下我先前flink1.9的工程,应用程序Jar里面也是没有这个类的,但是程序运行加载是没问题的,这么对比貌似就不是maven打包的问题了。。。。。
> >> > > > > > >
> >> > > > > > > On Wed, Apr 22, 2020 at 7:22 PM 宇张 <zhan...@akulaku.com>
> >> wrote:
> >> > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> >> > > > > > > > 这个没有,只有org.apache.flink.formats.json.JsonRowFormatFactory
> >> > > > > > > > 》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> >> > > > > > > > 这个能拿到
> >> > > > > > > >
> >> > > > > > > > 这么看来 貌似是 mvn打包有问题:
> >> > > > > > > > mvn clean package -DskipTests
> >> > > > > > > > 依赖范围为默认
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On Wed, Apr 22, 2020 at 7:05 PM Jingsong Li <
> >> > > > jingsongl...@gmail.com>
> >> > > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > >> Hi,
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory
> >> > > > > > > >>
> >> > > > > > > >> >
> >> > > > > >
> >> > >
> 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()
> >> > > > > > > >> 是的,拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> >> > > > > > > >>
> >> > > > > > > >> Best,
> >> > > > > > > >> Jingsong Lee
> >> > > > > > > >>
> >> > > > > > > >> On Wed, Apr 22, 2020 at 7:00 PM 宇张 <zhan...@akulaku.com>
> >> > wrote:
> >> > > > > > > >>
> >> > > > > > > >> > 看下你打包的 UberJar 里有没一个内容包括
> >> > > > > > > >> > 1、下面这个文件是存在的
> >> > > > > > > >> >
> >> > > > > > >
> >> > > >
> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >> > > > > > > >> > 的文件
> >> > > > > > > >> >
> >> > > META-INF/services/org.apache.flink.table.factories.TableFactory
> >> > > > > > > >> > 2、flink版本1.10,Standalone模式启动服务(start-cluster.sh),flink
> >> > > > > > > >> > run运行(/software/flink-1.10.0/bin/flink run -c
> >> > > > > > com.data.main.StreamMain
> >> > > > > > > >> > ./flink_1.10_test-1.0-jar-with-dependencies.jar)
> >> > > > > > > >> >
> >> 3、再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
> >> > > > > > > >> >
> >> > > > > >
> >> > >
> 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader()
> >> > > > > > > >> >
> >> > > > > > > >> >
> >> > > > > > > >> >
> >> > > > > > > >> > On Wed, Apr 22, 2020 at 6:00 PM Jingsong Li <
> >> > > > > jingsongl...@gmail.com
> >> > > > > > >
> >> > > > > > > >> > wrote:
> >> > > > > > > >> >
> >> > > > > > > >> > > Hi,
> >> > > > > > > >> > >
> >> > > > > > > >> > > 先确认下你的Jar包里有没有 meta-inf-services的文件?里面确定有Kafka?
> >> > > > > > > >> > >
> >> > > > > > > >> > >
> >> > > 如果有,再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader?
> >> > > > > > > >> > > 因为现在默认是通过ThreadClassLoader来获取Factory的。
> >> > > > > > > >> > >
> >> > > > > > > >> > > Best,
> >> > > > > > > >> > > Jingsong Lee
> >> > > > > > > >> > >
> >> > > > > > > >> > > On Wed, Apr 22, 2020 at 5:30 PM 宇张 <
> >> zhan...@akulaku.com>
> >> > > > wrote:
> >> > > > > > > >> > >
> >> > > > > > > >> > > > 我这面使用Standalone模式运行Flink任务,但是Uber
> >> > > > > > > >> > > >
> >> > > > Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
> >> > > > > > > >> > > >
> >> child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber
> >> > > > > > > >> > > Jar里面的Factory不能被加载
> >> > > > > > > >> > > > Flink Client respects Classloading Policy
> >> (FLINK-13749
> >> > > > > > > >> > > > <https://issues.apache.org/jira/browse/FLINK-13749
> >)
> >> > > > > > > >> > > > <
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
> >> > > > > > > >> > > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > The Flink client now also respects the configured
> >> > > > classloading
> >> > > > > > > >> policy,
> >> > > > > > > >> > > > i.e., parent-first or child-first classloading.
> >> > > Previously,
> >> > > > > only
> >> > > > > > > >> > cluster
> >> > > > > > > >> > > > components such as the job manager or task manager
> >> > > supported
> >> > > > > > this
> >> > > > > > > >> > > setting.
> >> > > > > > > >> > > > This does mean that users might get different
> >> behaviour
> >> > in
> >> > > > > their
> >> > > > > > > >> > > programs,
> >> > > > > > > >> > > > in which case they should configure the
> classloading
> >> > > policy
> >> > > > > > > >> explicitly
> >> > > > > > > >> > to
> >> > > > > > > >> > > > use parent-first classloading, which was the
> previous
> >> > > > > > (hard-coded)
> >> > > > > > > >> > > > behaviour.
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > 异常信息:
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > >  rg.apache.flink.client.program.ProgramInvocationException:
> >> > > > > The
> >> > > > > > > >> main
> >> > > > > > > >> > > > method caused an error: findAndCreateTableSource
> >> failed.
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > >
> >> > > > > > > >>
> >> > > > > >
> >> > > >
> >> >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> >> > > > > > > >> > > > at
> >> > > > > > >
> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> >> > > > > > > >> > > > at
> >> > > > > > > >>
> >> > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> >> > > > > > > >> > > > Caused by:
> org.apache.flink.table.api.TableException:
> >> > > > > > > >> > > > findAndCreateTableSource failed.
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> >> > > > > > > >> > > > at
> >> > > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
> >> > > > > > > >> > > > at
> >> > > com.akulaku.data.main.StreamMain.main(StreamMain.java:87)
> >> > > > > > > >> > > > at
> >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >> > > > Method)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> > > > > > > >> > > > at java.lang.reflect.Method.invoke(Method.java:498)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> >> > > > > > > >> > > > ... 8 more
> >> > > > > > > >> > > > Caused by:
> >> > > > > > > >>
> org.apache.flink.table.api.NoMatchingTableFactoryException:
> >> > > > > > > >> > > > Could not find a suitable table factory for
> >> > > > > > > >> > > >
> >> 'org.apache.flink.table.factories.TableSourceFactory' in
> >> > > > > > > >> > > > the classpath.
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > Reason: Required context properties mismatch.
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > The matching candidates:
> >> > > > > > > >> > > >
> >> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >> > > > > > > >> > > > Mismatched properties:
> >> > > > > > > >> > > > 'connector.type' expects 'filesystem', but is
> 'kafka'
> >> > > > > > > >> > > > 'format.type' expects 'csv', but is 'json'
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > The following properties are requested:
> >> > > > > > > >> > > > connector.properties.bootstrap.servers=centos:9092
> >> > > > > > > >> > > > connector.properties.zookeeper.connect=centos:2181
> >> > > > > > > >> > > > connector.startup-mode=earliest-offset
> >> > > > > > > >> > > > connector.topic=test
> >> > > > > > > >> > > > connector.type=kafka
> >> > > > > > > >> > > > connector.version=0.11
> >> > > > > > > >> > > > format.type=json
> >> > > > > > > >> > > > schema.0.data-type=VARCHAR(2147483647)
> >> > > > > > > >> > > > schema.0.name=bus
> >> > > > > > > >> > > > schema.1.data-type=BIGINT
> >> > > > > > > >> > > > schema.1.name=ts
> >> > > > > > > >> > > > schema.2.data-type=VARCHAR(2147483647)
> >> > > > > > > >> > > > schema.2.name=type
> >> > > > > > > >> > > > schema.3.data-type=BIGINT
> >> > > > > > > >> > > > schema.3.name=putRowNum
> >> > > > > > > >> > > > schema.4.data-type=TIMESTAMP(3) NOT NULL
> >> > > > > > > >> > > > schema.4.expr=PROCTIME()
> >> > > > > > > >> > > > schema.4.name=proctime
> >> > > > > > > >> > > > update-mode=append
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > The following factories have been considered:
> >> > > > > > > >> > > >
> >> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >> > > > > > > >> > > >
> >> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> >> > > > > > > >> > > > at
> >> > > > > > > >> > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> > >
> >> > > > > > > >> > > --
> >> > > > > > > >> > > Best, Jingsong Lee
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > > >> --
> >> > > > > > > >> Best, Jingsong Lee
> >> > > > > > > >>
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > --
> >> > > > > > Best, Jingsong Lee
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Best, Jingsong Lee
> >> > > >
> >> > >
> >> >
> >>
> >
>


-- 
Best, Jingsong Lee

回复