Hi:
    ??????????????????????, ????????kafka??factory????????.
    ?? 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies
 ????jar????flink??lib??????(????????????????????) ??????. 
    ??????????.






??2020??01??15?? 14:59??Others<41486...@qq.com> ??????
????????flink ???? ??1.9.1
??????????????????????????????????????
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&nbsp;&nbsp;&nbsp;&nbsp;at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&nbsp;&nbsp;&nbsp;&nbsp;at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&nbsp;&nbsp;&nbsp;&nbsp;at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&nbsp;&nbsp;&nbsp;&nbsp;at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
&nbsp;&nbsp;&nbsp;&nbsp;at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp;&nbsp;&nbsp;&nbsp;at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&nbsp;&nbsp;&nbsp;&nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&nbsp;&nbsp;&nbsp;&nbsp;... 17 more


??????pom????


<name&gt;Flink Quickstart Job</name&gt;
<url&gt;http://www.myorganization.org</url&gt;
<profiles&gt;
<profile&gt;
<id&gt;dev</id&gt;
<activation&gt;
<activeByDefault&gt;true</activeByDefault&gt;
</activation&gt;
<properties&gt;
<project.scope&gt;compile</project.scope&gt;
</properties&gt;
</profile&gt;
<profile&gt;
<id&gt;pro</id&gt;
<properties&gt;
<project.scope&gt;provided</project.scope&gt;
</properties&gt;
</profile&gt;
</profiles&gt;
<properties&gt;
<project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt;
<flink.version&gt;1.9.1</flink.version&gt;
<java.version&gt;1.8</java.version&gt;
<scala.binary.version&gt;2.11</scala.binary.version&gt;
<maven.compiler.source&gt;${java.version}</maven.compiler.source&gt;
<maven.compiler.target&gt;${java.version}</maven.compiler.target&gt;
</properties&gt;

<repositories&gt;
<repository&gt;
<id&gt;apache.snapshots</id&gt;
<name&gt;Apache Development Snapshot Repository</name&gt;
<url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt;
<releases&gt;
<enabled&gt;false</enabled&gt;
</releases&gt;
<snapshots&gt;
<enabled&gt;true</enabled&gt;
</snapshots&gt;
</repository&gt;
</repositories&gt;

<dependencies&gt;
<!-- Apache Flink dependencies --&gt;
<!-- These dependencies are provided, because they should not be packaged into 
the JAR file. --&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-java</artifactId&gt;
<version&gt;${flink.version}</version&gt;
<scope&gt;${project.scope}</scope&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt;
<version&gt;${flink.version}</version&gt;
<scope&gt;${project.scope}</scope&gt;
</dependency&gt;

<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-json</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-table-common</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-jdbc_2.11</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;



<!--mysql--&gt;
<dependency&gt;
<groupId&gt;mysql</groupId&gt;
<artifactId&gt;mysql-connector-java</artifactId&gt;
<version&gt;5.1.48</version&gt;
</dependency&gt;
<!-- Gson--&gt;
<dependency&gt;
<groupId&gt;com.google.code.gson</groupId&gt;
<artifactId&gt;gson</artifactId&gt;
<version&gt;2.8.5</version&gt;
</dependency&gt;
<!-- Add logging framework, to produce console output when running in the IDE. 
--&gt;
<!-- These dependencies are excluded from the application JAR by default. --&gt;
<dependency&gt;
<groupId&gt;org.slf4j</groupId&gt;
<artifactId&gt;slf4j-log4j12</artifactId&gt;
<version&gt;1.7.7</version&gt;
<scope&gt;runtime</scope&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;log4j</groupId&gt;
<artifactId&gt;log4j</artifactId&gt;
<version&gt;1.2.17</version&gt;
<scope&gt;runtime</scope&gt;
</dependency&gt;
</dependencies&gt;

<build&gt;
<plugins&gt;

<!-- Java Compiler --&gt;
<plugin&gt;
<groupId&gt;org.apache.maven.plugins</groupId&gt;
<artifactId&gt;maven-compiler-plugin</artifactId&gt;
<version&gt;3.1</version&gt;
<configuration&gt;
<source&gt;${java.version}</source&gt;
<target&gt;${java.version}</target&gt;
</configuration&gt;
</plugin&gt;
<!-- We use the maven-shade plugin to create a fat jar that contains all 
necessary dependencies. --&gt;
<!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program entry 
point changes. --&gt;
<plugin&gt;
<groupId&gt;org.apache.maven.plugins</groupId&gt;
<artifactId&gt;maven-shade-plugin</artifactId&gt;
<version&gt;3.0.0</version&gt;
<executions&gt;
<!-- Run shade goal on package phase --&gt;
<execution&gt;
<phase&gt;package</phase&gt;
<goals&gt;
<goal&gt;shade</goal&gt;
</goals&gt;
<configuration&gt;
<artifactSet&gt;
<excludes&gt;
<exclude&gt;org.apache.flink:force-shading</exclude&gt;
<exclude&gt;com.google.code.findbugs:jsr305</exclude&gt;
<exclude&gt;org.slf4j:*</exclude&gt;
<exclude&gt;log4j:*</exclude&gt;
</excludes&gt;
</artifactSet&gt;
<filters&gt;
<filter&gt;
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. --&gt;
<artifact&gt;*:*</artifact&gt;
<excludes&gt;
<exclude&gt;META-INF/*.SF</exclude&gt;
<exclude&gt;META-INF/*.DSA</exclude&gt;
<exclude&gt;META-INF/*.RSA</exclude&gt;
</excludes&gt;
</filter&gt;
</filters&gt;
<transformers&gt;
<transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt;
<mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt;
</transformer&gt;
</transformers&gt;
</configuration&gt;
</execution&gt;
</executions&gt;
</plugin&gt;
</plugins&gt;


</build&gt;


???? ???????????????? ????

回复