flink监控

2020-06-17 文章 wch...@163.com
现有集群hadoop-2.8.0, 并且在其中两台节点上有flink客户端包.
提交任务都在其中一台提交任务.per-job
现在要做flink任务监控, 准备使用大家推荐的pushgateway+prometheus+grafana.
flink on yarn使用的logback打印的日志. hadoop没有开启日志聚合.
现在有个问题是: 

hadoop的相关日志例如 
resourcemanager、nodemanager、datanode以及userlogs目录下的taskmanager日志的异常告警要怎么做呢???
有什么方案吗?



wch...@163.com


Re: 关于多个来源,如何保证数据对齐

2020-06-17 文章 Jark Wu
我也觉得双流 JOIN 或者 interval join 应该可以解决你的需求。

On Mon, 15 Jun 2020 at 19:41, Benchao Li  wrote:

> Hi,
> 听起来你的需求应该就是做一个双流join,可以做一个基于事件时间的双流join[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#interval-joins
>
> 阿华田  于2020年6月15日周一 下午6:31写道:
>
> > 建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache,
> > com.google.common.cache;
> >
> >
> > | |
> > 阿华田
> > |
> > |
> > a15733178...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月15日 14:41,steven chen 写道:
> > hi:
> > 1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。
> > 2. 有topic a 是所有的用户pv日志, topic b
> > 是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafka topic c中,
> > 问题:当a 消息 提前到达,b 消息晚20分钟到达,job 在工作时如何保证2个topic 数据对齐,或者说2边数据进行关联整合?
> > 相当于2条消息处理后合并成1条往下游sink ,如何保证数据数据a和b对应的上?
> >
> >
> >
> >
> >
>


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-17 文章 Jark Wu
你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/
自己编译一下:mvn clean install -DskipTests
在 build-target 下就是打出来的 1.11 的分发包内容。

Best,
Jark



On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17626017...@163.com> wrote:

>
>
> 是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月17日 13:25,Rui Li 写道:
> 是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。
>
> On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote:
>
> Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息
>
>
>
>
> 在2020年06月17日 10:27,Benchao Li 写道:
> 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
> module了。
> 如果只是connector、format这些用老的版本,应该是没有问题的。
> 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink
>
> Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:
>
> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError:
> org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>
>
>
>
>
> --
> Best regards!
> Rui Li
>


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-17 文章 Jark Wu
是的,我觉得这样子是能绕过的。

On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.com> wrote:

> 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> val resTmpTab: Table = tabEnv.sqlQuery(
>   """
> SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00'))
> time_str,COUNT(DISTINCT userkey) uv
> FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""")
>
> val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
>   .filter(line=>line._1==true).map(line=>line._2)
>
> val res= tabEnv.fromDataStream(resTmpStream)
> tabEnv.sqlUpdate(
>   s"""
> INSERT INTO rt_totaluv
> SELECT _1,MAX(_2)
> FROM $res
> GROUP BY _1
> """)
>
>
> -- 原始邮件 --
> 发件人: "Jark Wu" 发送时间: 2020年6月17日(星期三) 中午1:55
> 收件人: "user-zh"
> 主题: Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 在 Flink 1.11 中,你可以尝试这样:
>
> CREATE TABLE mysql (
>    time_str STRING,
>    uv BIGINT,
>    PRIMARY KEY (ts) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>    'table-name' = 'myuv'
> );
>
> INSERT INTO mysql
> SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT 
> user_id)
> FROM user_behavior;
>
> On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com> wrote:
>
> > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> > sink表这个样式
> > tm uv
> > 2020/06/17 13:46:00 1
> > 2020/06/17 13:47:00 2
> > 2020/06/17 13:48:00 3
> >
> >
> > group by 日期的话,分钟如何获取
> >
> >
> > -- 原始邮件 --
> > 发件人: "Benchao Li" > 发送时间: 2020年6月17日(星期三) 中午11:46
> > 收件人: "user-zh" >
> > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV
> >
> >
> >
> > Hi,
> > 我感觉这种场景可以有两种方式,
> > 1. 可以直接用group by + mini batch
> > 2. window聚合 + fast emit
> >
> > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
> > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> > 用参数[2] 来打开。
> >
> > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> > table.exec.emit.early-fire.enabled = true
> > table.exec.emit.early-fire.delay = 60 s
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> > [2]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> >
> > x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道:
> >
> > > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > > CREATE VIEW uv_per_10min AS
> > > SELECT&nbsp;
> > > &nbsp; MAX(DATE_FORMAT(proctime&nbsp;,
> '-MM-dd
> > HH:mm:00'))&nbsp;OVER w
> > > AS time_str,&nbsp;
> > > &nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> > > FROM user_behavior
> > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED
> PRECEDING AND
> > > CURRENT ROW);
> > >
> > >
> > > 想请教一下,应该如何处理?
> > > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd')
> 这样可以吗,另外状态应该如何清理?
> > > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > > 多谢


关于广播流的疑问

2020-06-17 文章 a511955993

Hi

在官网上看到关于广播流的说法,有一些疑问,在文档[1]写到广播后台的state backend只有in 
memory,没有rocksdb。在CoBroadcastWithKeyedOperator的open方法中,状态是通过getOperatorStateBackend().getBroadcastState(descriptor)创建的,getOperatorStateBackend()中通过stateBackend.createOperatorStateBackend创建对应的state
 backend,state backend不是根据配置选择对应的工厂创建出来的吗,这部分是如何限定in memory?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/broadcast_state.html#important-considerations


Best

Looking forward to your reply and help.

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

?????? ??????FLINKSQL1.10????????????UV

2020-06-17 文章 x
??1.10??,???
val resTmpTab: Table = tabEnv.sqlQuery(
  """
SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT 
userkey) uv
FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""")

val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
  .filter(line=>line._1==true).map(line=>line._2)

val res= tabEnv.fromDataStream(resTmpStream)
tabEnv.sqlUpdate(
  s"""
INSERT INTO rt_totaluv
SELECT _1,MAX(_2)
FROM $res
GROUP BY _1
""")


--  --
??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907...@qq.com> ??2020??6??17?? 11:14??
>
> > 
??0??UV??UV??
> > CREATE VIEW uv_per_10min AS
> > SELECT&nbsp;
> > &nbsp; MAX(DATE_FORMAT(proctime&nbsp;, '-MM-dd
> HH:mm:00'))&nbsp;OVER w
> > AS time_str,&nbsp;
> > &nbsp; COUNT(DISTINCT user_id) OVER w AS uv
> > FROM user_behavior
> > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING 
AND
> > CURRENT ROW);
> >
> >
> > ??
> > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
> > PS??1.10??DDL??CREATE VIEW??
> > 

Re: flink sql sink mysql requires primary keys

2020-06-17 文章 Jark Wu
Hi,

在 Flink 1.10 中,sink 的 primary key 是从 query 推导的,如果 query 推导不出 pk 就会报你看到的错误
“UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.”
你的这个作业就是 query pk 推导不出来的 case。

此外 DDL 上声明 PK 在1.10也是不支持的。

这些问题,在 1.11 都解决了,可以尝试自己拿 release-1.11 分支编译下尝试下。
Flink 1.11 中,sink的 primary key 都是从 DDL 上用户显式声明出来的,不会去推导 query pk。

Best,
Jark


On Thu, 18 Jun 2020 at 09:39, Zhou Zach  wrote:

> 加了primary key报错,
> Exception in thread "main"
> org.apache.flink.table.planner.operations.SqlConversionException: Primary
> key and unique key are not supported yet.
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52)
> at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
>
>
> Query:
>
>
> streamTableEnv.sqlUpdate(
> """
> |
> |CREATE TABLE user_uv(
> |`time` VARCHAR,
> |cnt bigint,
> |PRIMARY KEY (`time`)
> |) WITH (
> |'connector.type' = 'jdbc',
> |'connector.write.flush.max-rows' = '1'
> |)
> |""".stripMargin)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2020-06-17 20:59:35, "Zhou Zach"  wrote:
> >Exception in thread "main" org.apache.flink.table.api.TableException:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> >   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> >   at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> >   at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> >   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >   at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >   at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> >   at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> >   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >   at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
> >   at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
> >
> >
> >
> >
> >
> >Query:
> >Flink :1.10.0
> >CREATE TABLE user_uv(
> >|`time` VARCHAR,
> >|cnt bigint
> >|) WITH (
> >|'connector.type' = 'jdbc')
> >|insert into user_uv
> >|select  MAX(DATE_FORMAT(created_time, '-MM-dd HH:mm:00')) as `time`,
> COUNT(DISTINCT  uid) as cnt
> >|from `user`
> >|group by DATE_FORMAT(created_time, '-MM-dd HH:mm:00')
>


Re:flink sql sink mysql requires primary keys

2020-06-17 文章 Zhou Zach
加了primary key报错,
Exception in thread "main" 
org.apache.flink.table.planner.operations.SqlConversionException: Primary key 
and unique key are not supported yet.
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at 
org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52)
at 
org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)


Query:


streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_uv(
|`time` VARCHAR,
|cnt bigint,
|PRIMARY KEY (`time`)
|) WITH (
|'connector.type' = 'jdbc',
|'connector.write.flush.max-rows' = '1'
|)
|""".stripMargin)

















At 2020-06-17 20:59:35, "Zhou Zach"  wrote:
>Exception in thread "main" org.apache.flink.table.api.TableException: 
>UpsertStreamTableSink requires that Table has a full primary keys if it is 
>updated.
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>   at 
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
>   at 
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
>
>
>
>
>
>Query:
>Flink :1.10.0
>CREATE TABLE user_uv(
>|`time` VARCHAR,
>|cnt bigint
>|) WITH (
>|'connector.type' = 'jdbc')
>|insert into user_uv
>|select  MAX(DATE_FORMAT(created_time, '-MM-dd HH:mm:00')) as `time`, 
>COUNT(DISTINCT  uid) as cnt
>|from `user`
>|group by DATE_FORMAT(created_time, '-MM-dd HH:mm:00')


flink sql sink mysql requires primary keys

2020-06-17 文章 Zhou Zach
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at 
org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
at 
org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)





Query:
Flink :1.10.0
CREATE TABLE user_uv(
|`time` VARCHAR,
|cnt bigint
|) WITH (
|'connector.type' = 'jdbc')
|insert into user_uv
|select  MAX(DATE_FORMAT(created_time, '-MM-dd HH:mm:00')) as `time`, 
COUNT(DISTINCT  uid) as cnt
|from `user`
|group by DATE_FORMAT(created_time, '-MM-dd HH:mm:00')

Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-17 文章 Fabian Hueske
Congrats Yu!

Cheers, Fabian

Am Mi., 17. Juni 2020 um 10:20 Uhr schrieb Till Rohrmann <
trohrm...@apache.org>:

> Congratulations Yu!
>
> Cheers,
> Till
>
> On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li 
> wrote:
>
> > Congratulations Yu, well deserved!
> >
> > Best,
> > Jingsong
> >
> > On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:
> >
> >> Congrats, Yu!
> >>
> >> GXGX & well deserved!!
> >>
> >> Best Regards,
> >>
> >> Yuan
> >>
> >> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> >>> part of the Apache Flink Project Management Committee (PMC).
> >>>
> >>> Yu Li has been very active on Flink's Statebackend component, working
> on
> >>> various improvements, for example the RocksDB memory management for
> 1.10.
> >>> and keeps checking and voting for our releases, and also has
> successfully
> >>> produced two releases(1.10.0&1.10.1) as RM.
> >>>
> >>> Congratulations & Welcome Yu Li!
> >>>
> >>> Best,
> >>> Jincheng (on behalf of the Flink PMC)
> >>>
> >>
> >
> > --
> > Best, Jingsong Lee
> >
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-17 文章 Till Rohrmann
Congratulations Yu!

Cheers,
Till

On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li  wrote:

> Congratulations Yu, well deserved!
>
> Best,
> Jingsong
>
> On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:
>
>> Congrats, Yu!
>>
>> GXGX & well deserved!!
>>
>> Best Regards,
>>
>> Yuan
>>
>> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
>> wrote:
>>
>>> Hi all,
>>>
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>>
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>>
>>> Congratulations & Welcome Yu Li!
>>>
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>>>
>>
>
> --
> Best, Jingsong Lee
>


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-17 文章 Sun.Zhu


是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月17日 13:25,Rui Li 写道:
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote:

Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError:
org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 <
https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  --
zookeeper
地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to
preserve datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
behavior=[$3], ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
behavior=[$3], ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制







--
Best regards!
Rui Li