🤗感谢 Jark Wu <imj...@gmail.com> 于2019年8月27日周二 下午6:49写道:
> > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector > < > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector > > > > > 在 2019年8月27日,17:59,徐骁 <ffxrqy...@gmail.com> 写道: > > > > 这部分有文档吗,看了好几圈没看到 > > > > hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道: > > > >> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了. > >> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题. > >> > >> 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道: > >>> kafka版本是 kafka_2.11-1.1.0, > >>> 支持的kafka版本有哪些 > >>> 在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" < > >> pengcheng...@bonc.com.cn> 写道: > >>>> 检查一下代码的kafka版本,可能是这方面的错误 > >>>> > >>>> > >>>> > >>>> pengcheng...@bonc.com.cn > >>>> > >>>> 发件人: hb > >>>> 发送时间: 2019-08-26 15:14 > >>>> 收件人: user-zh > >>>> 主题: Re:Re: flink1.9 blink planner table ddl 使用问题 > >>>> 之前少了 flink-connector-kafka_2.11 依赖, > >>>> 现在错误变成 Caused by: java.lang.NoSuchMethodError: > >> > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V > >>>> 了 > >>>> > >>>> > >>>> pom依赖: > >>>> ``` > >>>> <dependencies> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-core</artifactId> > >>>> <version>${flink.version}</version> > >>>> > >>>> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-clients_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> > >>>> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-scala_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> > >>>> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-streaming-scala_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> > >>>> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table</artifactId> > >>>> <version>1.9.0</version> > >>>> <type>pom</type> > >>>> <scope>provided</scope> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-common</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-cep-scala_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-connector-filesystem_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> <!-- <scope>provided</scope>--> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> <!-- <scope>provided</scope>--> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-planner_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> <!-- <scope>provided</scope>--> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-runtime-blink_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-table-planner-blink_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> <!-- <scope>provided</scope>--> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> > >>>> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-connector-kafka_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-json</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> <dependency> > >>>> <groupId>org.apache.flink</groupId> > >>>> <artifactId>flink-runtime-web_2.11</artifactId> > >>>> <version>${flink.version}</version> > >>>> </dependency> > >>>> </dependencies> > >>>> > >>>> > >>>> ``` > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> 在 2019-08-26 13:37:51,"Jark Wu" <imj...@gmail.com> 写道: > >>>>> Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>>> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道: > >>>>>> > >>>>>> 使用了你的ddl语句,还是报一样的错误. > >>>>>> 我是在idea里面执行的,maven 配置的依赖. > >>>>>> > >>>>>> 在 2019-08-26 11:22:20,"Jark Wu" <imj...@gmail.com> 写道: > >>>>>>> Hi, > >>>>>>> > >>>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。 > >>>>>>> > >>>>>>> 1. 缺少format properties > >>>>>>> 2. 缺少 connector.version > >>>>>>> 3. bootstrap.severs 的配置方式写的不对... > >>>>>>> > >>>>>>> > >>>>>>> 你可以参考下面这个作为example: > >>>>>>> > >>>>>>> > >>>>>>> CREATE TABLE kafka_json_source ( > >>>>>>> rowtime TIMESTAMP, > >>>>>>> user_name VARCHAR, > >>>>>>> event ROW<message_type VARCHAR, message VARCHAR> > >>>>>>> ) WITH ( > >>>>>>> 'connector.type' = 'kafka', > >>>>>>> 'connector.version' = 'universal', > >>>>>>> 'connector.topic' = 'test-json', > >>>>>>> 'connector.startup-mode' = 'earliest-offset', > >>>>>>> 'connector.properties.0.key' = 'zookeeper.connect', > >>>>>>> 'connector.properties.0.value' = 'localhost:2181', > >>>>>>> 'connector.properties.1.key' = 'bootstrap.servers', > >>>>>>> 'connector.properties.1.value' = 'localhost:9092', > >>>>>>> 'update-mode' = 'append', > >>>>>>> 'format.type' = 'json', > >>>>>>> 'format.derive-schema' = 'true' > >>>>>>> ); > >>>>>>> > >>>>>>> > >>>>>>> Kafka 中的数据长这个样子: > >>>>>>> > >>>>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": > { > >> "message_type": "WARNING", "message": "This is a warning."}} > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Jark > >>>>>>> > >>>>>>> > >>>>>>>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道: > >>>>>>>> > >>>>>>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 > >> 需要实现TableSourceFactory,还是其他什么. > >>>>>>>> > >>>>>>>> > >>>>>>>> 提示: > >>>>>>>> Exception in thread "main" > >> org.apache.flink.table.api.ValidationException: SQL validation failed. > >> findAndCreateTableSource failed. > >>>>>>>> Caused by: > >> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not > find > >> a suitable table factory for > >> 'org.apache.flink.table.factories.TableSourceFactory' in > >>>>>>>> the classpath. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 代码: > >>>>>>>> ``` > >>>>>>>> import > >> org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} > >>>>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} > >>>>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, > _} > >>>>>>>> import org.apache.flink.types.Row > >>>>>>>> > >>>>>>>> > >>>>>>>> object KafkaInDDL extends App { > >>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment > >>>>>>>> val settings: EnvironmentSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >>>>>>>> val tEnv: StreamTableEnvironment = > >> StreamTableEnvironment.create(env, settings) > >>>>>>>> > >>>>>>>> > >>>>>>>> val sourceDDL = > >>>>>>>> """create table sourceTable( > >>>>>>>> id int, > >>>>>>>> name varchar > >>>>>>>> ) with ( > >>>>>>>> 'connector.type' = 'kafka', > >>>>>>>> 'connector.property-version' = '1', > >>>>>>>> 'update-mode' = 'append', > >>>>>>>> 'bootstrap.servers' = ' > >> 192.168.1.160:19092', > >>>>>>>> 'connector.topic' = 'hbtest1', > >>>>>>>> 'connector.startup-mode' = > >> 'earliest-offset' > >>>>>>>> ) > >>>>>>>> """ > >>>>>>>> tEnv.sqlUpdate(sourceDDL) > >>>>>>>> tEnv.sqlQuery("select * from > >> sourceTable").toAppendStream[Row].print() > >>>>>>>> tEnv.execute("") > >>>>>>>> } > >>>>>>>> ``` > >>>>>>> > >> > >