flink监控
现有集群hadoop-2.8.0, 并且在其中两台节点上有flink客户端包. 提交任务都在其中一台提交任务.per-job 现在要做flink任务监控, 准备使用大家推荐的pushgateway+prometheus+grafana. flink on yarn使用的logback打印的日志. hadoop没有开启日志聚合. 现在有个问题是: hadoop的相关日志例如 resourcemanager、nodemanager、datanode以及userlogs目录下的taskmanager日志的异常告警要怎么做呢??? 有什么方案吗? wch...@163.com
Re: 关于多个来源,如何保证数据对齐
我也觉得双流 JOIN 或者 interval join 应该可以解决你的需求。 On Mon, 15 Jun 2020 at 19:41, Benchao Li wrote: > Hi, > 听起来你的需求应该就是做一个双流join,可以做一个基于事件时间的双流join[1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#interval-joins > > 阿华田 于2020年6月15日周一 下午6:31写道: > > > 建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache, > > com.google.common.cache; > > > > > > | | > > 阿华田 > > | > > | > > a15733178...@163.com > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年06月15日 14:41,steven chen 写道: > > hi: > > 1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。 > > 2. 有topic a 是所有的用户pv日志, topic b > > 是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafka topic c中, > > 问题:当a 消息 提前到达,b 消息晚20分钟到达,job 在工作时如何保证2个topic 数据对齐,或者说2边数据进行关联整合? > > 相当于2条消息处理后合并成1条往下游sink ,如何保证数据数据a和b对应的上? > > > > > > > > > > >
Re: sqlclient集成hiveCatalog查询kafka表问题
你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/ 自己编译一下:mvn clean install -DskipTests 在 build-target 下就是打出来的 1.11 的分发包内容。 Best, Jark On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17626017...@163.com> wrote: > > > 是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢 > > > | | > Sun.Zhu > | > | > 17626017...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年06月17日 13:25,Rui Li 写道: > 是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。 > > On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote: > > Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息 > > > > > 在2020年06月17日 10:27,Benchao Li 写道: > 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common > module了。 > 如果只是connector、format这些用老的版本,应该是没有问题的。 > 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink > > Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道: > > 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 > 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗? > > > > > 在2020年06月16日 18:38,Benchao Li 写道: > 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。 > 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。 > > Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道: > > 我编译了1.11包 > 在sql-cli下查询hive的表报如下错误: > [ERROR] Could not execute SQL statement. Reason: > java.lang.NoClassDefFoundError: > org/apache/flink/table/dataformat/BaseRow > > > 查注册的kafka表报: > [ERROR] Could not execute SQL statement. Reason: > java.lang.ClassNotFoundException: > org.apache.flink.table.dataformat.BaseRow > > > 依赖包是从1.10.1下面拷贝的 > | | > Sun.Zhu > | > | > 17626017...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道: > Got it! > Thx,junbao > > > | | > Sun.Zhu > | > | > 17626017...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年06月13日 09:32,zhangjunbao 写道: > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的, > https://issues.apache.org/jira/browse/FLINK-17189 < > https://issues.apache.org/jira/browse/FLINK-17189> > > Best, > Junbao Zhang > > 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道: > > hi,all > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table > ddl如下: > | > CREATETABLE user_behavior ( > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3), > proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列 > ) WITH ( > 'connector.type' = 'kafka', -- 使用 kafka connector > 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 > 'connector.topic' = 'user_behavior', -- kafka topic > 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 > 'connector.properties.zookeeper.connect' = 'localhost:2181', -- > zookeeper > 地址 > 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka > broker 地址 > 'format.type' = 'json'-- 数据源格式为 json > ); > | > 在查询时select * from user_behavior;报错如下: > [ERROR] Could not execute SQL statement. Reason: > java.lang.AssertionError: Conversion to relational algebra failed to > preserve datatypes: > validated type: > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL > converted type: > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT > NULL > rel: > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], > behavior=[$3], ts=[$4], proctime=[$5]) > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL > SECOND)]) > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], > behavior=[$3], ts=[$4], proctime=[PROCTIME()]) > LogicalTableScan(table=[[myhive, my_db, user_behavior, source: > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]]) > > > flink版本:1.10.1 > blink planner,streaming model > > > Thx > | | > Sun.Zhu > | > | > 17626017...@163.com > | > 签名由网易邮箱大师定制 > > > > > > > > -- > Best regards! > Rui Li >
Re: 求助:FLINKSQL1.10实时统计累计UV
是的,我觉得这样子是能绕过的。 On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.com> wrote: > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > val resTmpTab: Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""") > > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > .filter(line=>line._1==true).map(line=>line._2) > > val res= tabEnv.fromDataStream(resTmpStream) > tabEnv.sqlUpdate( > s""" > INSERT INTO rt_totaluv > SELECT _1,MAX(_2) > FROM $res > GROUP BY _1 > """) > > > -- 原始邮件 -- > 发件人: "Jark Wu" 发送时间: 2020年6月17日(星期三) 中午1:55 > 收件人: "user-zh" > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 在 Flink 1.11 中,你可以尝试这样: > > CREATE TABLE mysql ( > time_str STRING, > uv BIGINT, > PRIMARY KEY (ts) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > 'table-name' = 'myuv' > ); > > INSERT INTO mysql > SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT > user_id) > FROM user_behavior; > > On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com> wrote: > > > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > > sink表这个样式 > > tm uv > > 2020/06/17 13:46:00 1 > > 2020/06/17 13:47:00 2 > > 2020/06/17 13:48:00 3 > > > > > > group by 日期的话,分钟如何获取 > > > > > > -- 原始邮件 -- > > 发件人: "Benchao Li" > 发送时间: 2020年6月17日(星期三) 中午11:46 > > 收件人: "user-zh" > > > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV > > > > > > > > Hi, > > 我感觉这种场景可以有两种方式, > > 1. 可以直接用group by + mini batch > > 2. window聚合 + fast emit > > > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。 > > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > > 用参数[2] 来打开。 > > > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > > table.exec.emit.early-fire.enabled = true > > table.exec.emit.early-fire.delay = 60 s > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > [2] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > > > x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道: > > > > > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > > > CREATE VIEW uv_per_10min AS > > > SELECT > > > MAX(DATE_FORMAT(proctime , > '-MM-dd > > HH:mm:00')) OVER w > > > AS time_str, > > > COUNT(DISTINCT user_id) OVER w AS uv > > > FROM user_behavior > > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED > PRECEDING AND > > > CURRENT ROW); > > > > > > > > > 想请教一下,应该如何处理? > > > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') > 这样可以吗,另外状态应该如何清理? > > > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > > > 多谢
关于广播流的疑问
Hi 在官网上看到关于广播流的说法,有一些疑问,在文档[1]写到广播后台的state backend只有in memory,没有rocksdb。在CoBroadcastWithKeyedOperator的open方法中,状态是通过getOperatorStateBackend().getBroadcastState(descriptor)创建的,getOperatorStateBackend()中通过stateBackend.createOperatorStateBackend创建对应的state backend,state backend不是根据配置选择对应的工厂创建出来的吗,这部分是如何限定in memory? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/broadcast_state.html#important-considerations Best Looking forward to your reply and help. | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制
?????? ??????FLINKSQL1.10????????????UV
??1.10??,??? val resTmpTab: Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""") val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) .filter(line=>line._1==true).map(line=>line._2) val res= tabEnv.fromDataStream(resTmpStream) tabEnv.sqlUpdate( s""" INSERT INTO rt_totaluv SELECT _1,MAX(_2) FROM $res GROUP BY _1 """) -- -- ??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <35907...@qq.com> ??2020??6??17?? 11:14?? > > > ??0??UV??UV?? > > CREATE VIEW uv_per_10min AS > > SELECT > > MAX(DATE_FORMAT(proctime , '-MM-dd > HH:mm:00')) OVER w > > AS time_str, > > COUNT(DISTINCT user_id) OVER w AS uv > > FROM user_behavior > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > > CURRENT ROW); > > > > > > ?? > > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? > > PS??1.10??DDL??CREATE VIEW?? > >
Re: flink sql sink mysql requires primary keys
Hi, 在 Flink 1.10 中,sink 的 primary key 是从 query 推导的,如果 query 推导不出 pk 就会报你看到的错误 “UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” 你的这个作业就是 query pk 推导不出来的 case。 此外 DDL 上声明 PK 在1.10也是不支持的。 这些问题,在 1.11 都解决了,可以尝试自己拿 release-1.11 分支编译下尝试下。 Flink 1.11 中,sink的 primary key 都是从 DDL 上用户显式声明出来的,不会去推导 query pk。 Best, Jark On Thu, 18 Jun 2020 at 09:39, Zhou Zach wrote: > 加了primary key报错, > Exception in thread "main" > org.apache.flink.table.planner.operations.SqlConversionException: Primary > key and unique key are not supported yet. > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52) > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) > > > Query: > > > streamTableEnv.sqlUpdate( > """ > | > |CREATE TABLE user_uv( > |`time` VARCHAR, > |cnt bigint, > |PRIMARY KEY (`time`) > |) WITH ( > |'connector.type' = 'jdbc', > |'connector.write.flush.max-rows' = '1' > |) > |""".stripMargin) > > > > > > > > > > > > > > > > > > At 2020-06-17 20:59:35, "Zhou Zach" wrote: > >Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68) > > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) > > > > > > > > > > > >Query: > >Flink :1.10.0 > >CREATE TABLE user_uv( > >|`time` VARCHAR, > >|cnt bigint > >|) WITH ( > >|'connector.type' = 'jdbc') > >|insert into user_uv > >|select MAX(DATE_FORMAT(created_time, '-MM-dd HH:mm:00')) as `time`, > COUNT(DISTINCT uid) as cnt > >|from `user` > >|group by DATE_FORMAT(created_time, '-MM-dd HH:mm:00') >
Re:flink sql sink mysql requires primary keys
加了primary key报错, Exception in thread "main" org.apache.flink.table.planner.operations.SqlConversionException: Primary key and unique key are not supported yet. at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) Query: streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_uv( |`time` VARCHAR, |cnt bigint, |PRIMARY KEY (`time`) |) WITH ( |'connector.type' = 'jdbc', |'connector.write.flush.max-rows' = '1' |) |""".stripMargin) At 2020-06-17 20:59:35, "Zhou Zach" wrote: >Exception in thread "main" org.apache.flink.table.api.TableException: >UpsertStreamTableSink requires that Table has a full primary keys if it is >updated. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68) > at > org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) > > > > > >Query: >Flink :1.10.0 >CREATE TABLE user_uv( >|`time` VARCHAR, >|cnt bigint >|) WITH ( >|'connector.type' = 'jdbc') >|insert into user_uv >|select MAX(DATE_FORMAT(created_time, '-MM-dd HH:mm:00')) as `time`, >COUNT(DISTINCT uid) as cnt >|from `user` >|group by DATE_FORMAT(created_time, '-MM-dd HH:mm:00')
flink sql sink mysql requires primary keys
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala) Query: Flink :1.10.0 CREATE TABLE user_uv( |`time` VARCHAR, |cnt bigint |) WITH ( |'connector.type' = 'jdbc') |insert into user_uv |select MAX(DATE_FORMAT(created_time, '-MM-dd HH:mm:00')) as `time`, COUNT(DISTINCT uid) as cnt |from `user` |group by DATE_FORMAT(created_time, '-MM-dd HH:mm:00')
Re: [ANNOUNCE] Yu Li is now part of the Flink PMC
Congrats Yu! Cheers, Fabian Am Mi., 17. Juni 2020 um 10:20 Uhr schrieb Till Rohrmann < trohrm...@apache.org>: > Congratulations Yu! > > Cheers, > Till > > On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li > wrote: > > > Congratulations Yu, well deserved! > > > > Best, > > Jingsong > > > > On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei wrote: > > > >> Congrats, Yu! > >> > >> GXGX & well deserved!! > >> > >> Best Regards, > >> > >> Yuan > >> > >> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun > >> wrote: > >> > >>> Hi all, > >>> > >>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now > >>> part of the Apache Flink Project Management Committee (PMC). > >>> > >>> Yu Li has been very active on Flink's Statebackend component, working > on > >>> various improvements, for example the RocksDB memory management for > 1.10. > >>> and keeps checking and voting for our releases, and also has > successfully > >>> produced two releases(1.10.0&1.10.1) as RM. > >>> > >>> Congratulations & Welcome Yu Li! > >>> > >>> Best, > >>> Jincheng (on behalf of the Flink PMC) > >>> > >> > > > > -- > > Best, Jingsong Lee > > >
Re: [ANNOUNCE] Yu Li is now part of the Flink PMC
Congratulations Yu! Cheers, Till On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li wrote: > Congratulations Yu, well deserved! > > Best, > Jingsong > > On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei wrote: > >> Congrats, Yu! >> >> GXGX & well deserved!! >> >> Best Regards, >> >> Yuan >> >> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun >> wrote: >> >>> Hi all, >>> >>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now >>> part of the Apache Flink Project Management Committee (PMC). >>> >>> Yu Li has been very active on Flink's Statebackend component, working on >>> various improvements, for example the RocksDB memory management for 1.10. >>> and keeps checking and voting for our releases, and also has successfully >>> produced two releases(1.10.0&1.10.1) as RM. >>> >>> Congratulations & Welcome Yu Li! >>> >>> Best, >>> Jincheng (on behalf of the Flink PMC) >>> >> > > -- > Best, Jingsong Lee >
回复: sqlclient集成hiveCatalog查询kafka表问题
是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月17日 13:25,Rui Li 写道: 是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。 On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote: Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息 在2020年06月17日 10:27,Benchao Li 写道: 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common module了。 如果只是connector、format这些用老的版本,应该是没有问题的。 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道: 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗? 在2020年06月16日 18:38,Benchao Li 写道: 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。 Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道: 我编译了1.11包 在sql-cli下查询hive的表报如下错误: [ERROR] Could not execute SQL statement. Reason: java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow 查注册的kafka表报: [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow 依赖包是从1.10.1下面拷贝的 | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道: Got it! Thx,junbao | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月13日 09:32,zhangjunbao 写道: 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的, https://issues.apache.org/jira/browse/FLINK-17189 < https://issues.apache.org/jira/browse/FLINK-17189> Best, Junbao Zhang 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道: hi,all 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table ddl如下: | CREATETABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json'-- 数据源格式为 json ); | 在查询时select * from user_behavior;报错如下: [ERROR] Could not execute SQL statement. Reason: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL converted type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL rel: LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[$5]) LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)]) LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[PROCTIME()]) LogicalTableScan(table=[[myhive, my_db, user_behavior, source: [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]]) flink版本:1.10.1 blink planner,streaming model Thx | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 -- Best regards! Rui Li