🤗感谢

Jark Wu <imj...@gmail.com> 于2019年8月27日周二 下午6:49写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> >
>
> > 在 2019年8月27日,17:59,徐骁 <ffxrqy...@gmail.com> 写道:
> >
> > 这部分有文档吗,看了好几圈没看到
> >
> > hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道:
> >
> >> 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
> >> Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
> >>
> >> 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道:
> >>> kafka版本是 kafka_2.11-1.1.0,
> >>> 支持的kafka版本有哪些
> >>> 在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <
> >> pengcheng...@bonc.com.cn> 写道:
> >>>> 检查一下代码的kafka版本,可能是这方面的错误
> >>>>
> >>>>
> >>>>
> >>>> pengcheng...@bonc.com.cn
> >>>>
> >>>> 发件人: hb
> >>>> 发送时间: 2019-08-26 15:14
> >>>> 收件人: user-zh
> >>>> 主题: Re:Re: flink1.9 blink planner table ddl 使用问题
> >>>> 之前少了 flink-connector-kafka_2.11 依赖,
> >>>> 现在错误变成  Caused by: java.lang.NoSuchMethodError:
> >>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
> >>>> 了
> >>>>
> >>>>
> >>>> pom依赖:
> >>>> ```
> >>>>   <dependencies>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-core</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>
> >>>>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-clients_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>
> >>>>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-scala_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>
> >>>>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-streaming-scala_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>
> >>>>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table</artifactId>
> >>>>           <version>1.9.0</version>
> >>>>           <type>pom</type>
> >>>>           <scope>provided</scope>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-common</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-cep-scala_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-connector-filesystem_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>> <!--            <scope>provided</scope>-->
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-api-java-bridge_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>           <!--            <scope>provided</scope>-->
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-planner_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>           <!--            <scope>provided</scope>-->
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-runtime-blink_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-table-planner-blink_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>           <!--            <scope>provided</scope>-->
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>
> >>>>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-connector-kafka_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-json</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>       <dependency>
> >>>>           <groupId>org.apache.flink</groupId>
> >>>>           <artifactId>flink-runtime-web_2.11</artifactId>
> >>>>           <version>${flink.version}</version>
> >>>>       </dependency>
> >>>>   </dependencies>
> >>>>
> >>>>
> >>>> ```
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> 在 2019-08-26 13:37:51,"Jark Wu" <imj...@gmail.com> 写道:
> >>>>> Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道:
> >>>>>>
> >>>>>> 使用了你的ddl语句,还是报一样的错误.
> >>>>>> 我是在idea里面执行的,maven 配置的依赖.
> >>>>>>
> >>>>>> 在 2019-08-26 11:22:20,"Jark Wu" <imj...@gmail.com> 写道:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
> >>>>>>>
> >>>>>>> 1. 缺少format properties
> >>>>>>> 2. 缺少 connector.version
> >>>>>>> 3. bootstrap.severs 的配置方式写的不对...
> >>>>>>>
> >>>>>>>
> >>>>>>> 你可以参考下面这个作为example:
> >>>>>>>
> >>>>>>>
> >>>>>>> CREATE TABLE kafka_json_source (
> >>>>>>>  rowtime TIMESTAMP,
> >>>>>>>  user_name VARCHAR,
> >>>>>>>  event ROW<message_type VARCHAR, message VARCHAR>
> >>>>>>> ) WITH (
> >>>>>>>  'connector.type' = 'kafka',
> >>>>>>>  'connector.version' = 'universal',
> >>>>>>>  'connector.topic' = 'test-json',
> >>>>>>>  'connector.startup-mode' = 'earliest-offset',
> >>>>>>>  'connector.properties.0.key' = 'zookeeper.connect',
> >>>>>>>  'connector.properties.0.value' = 'localhost:2181',
> >>>>>>>  'connector.properties.1.key' = 'bootstrap.servers',
> >>>>>>>  'connector.properties.1.value' = 'localhost:9092',
> >>>>>>>  'update-mode' = 'append',
> >>>>>>>  'format.type' = 'json',
> >>>>>>>  'format.derive-schema' = 'true'
> >>>>>>> );
> >>>>>>>
> >>>>>>>
> >>>>>>> Kafka 中的数据长这个样子:
> >>>>>>>
> >>>>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event":
> {
> >> "message_type": "WARNING", "message": "This is a warning."}}
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>>
> >>>>>>>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
> >>>>>>>>
> >>>>>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是
> >> 需要实现TableSourceFactory,还是其他什么.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 提示:
> >>>>>>>> Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
> >> findAndCreateTableSource failed.
> >>>>>>>> Caused by:
> >> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
> find
> >> a suitable table factory for
> >> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>>>>>>> the classpath.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 代码:
> >>>>>>>> ```
> >>>>>>>> import
> >> org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> >>>>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
> >>>>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment,
> _}
> >>>>>>>> import org.apache.flink.types.Row
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> object KafkaInDDL extends App {
> >>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>>>>>>> val settings: EnvironmentSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>>>>>>> val tEnv: StreamTableEnvironment =
> >> StreamTableEnvironment.create(env, settings)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> val sourceDDL =
> >>>>>>>>  """create table sourceTable(
> >>>>>>>>                          id int,
> >>>>>>>>                          name varchar
> >>>>>>>>                        ) with (
> >>>>>>>>                          'connector.type' = 'kafka',
> >>>>>>>>                          'connector.property-version' = '1',
> >>>>>>>>                          'update-mode' = 'append',
> >>>>>>>>                          'bootstrap.servers' = '
> >> 192.168.1.160:19092',
> >>>>>>>>                          'connector.topic' = 'hbtest1',
> >>>>>>>>                          'connector.startup-mode' =
> >> 'earliest-offset'
> >>>>>>>>                        )
> >>>>>>>>  """
> >>>>>>>> tEnv.sqlUpdate(sourceDDL)
> >>>>>>>> tEnv.sqlQuery("select * from
> >> sourceTable").toAppendStream[Row].print()
> >>>>>>>> tEnv.execute("")
> >>>>>>>> }
> >>>>>>>> ```
> >>>>>>>
> >>
>
>

回复