?????? flink 1.9 ????????????????

2020-06-03 文章 star
append??state?? 
distinctappend??
append??




----
??:"LakeShen"https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat


 1048262223 <1048262...@qq.comgt; ??2020??6??3?? 2:59??

 gt; Hi
 gt; Flink ??RetractStream
 gt;
 
sinkupdatekafkaupdatesink??kafka??RetractStream
 gt;
 gt;
 gt; Best,
 gt; Yichao Yang
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"star"<3149768...@qq.comamp;gt;;
 gt; :amp;nbsp;2020??6??3??(??) 2:47
 gt; 
??:amp;nbsp;"user-zh@flink.apache.org"

Re: flink 1.9 关于回撤流的问题

2020-06-03 文章 LakeShen
Hi,

RetractStream 目前是无法输出到 kafka 的,因为 kafka 是 Append 模式。

不过你应该可以定义一个时间窗口 T ,滚动窗口的时间就是 T,然后聚合一次,输出到 kafka,后面都使用这个 kafka topic。

Best,
LakeShen

star <3149768...@qq.com> 于2020年6月3日周三 下午4:31写道:

> 感谢两位的回复,
> 转成回撤流的这个流其实是一张轻度汇总表,
> 例如,select year,month,day,province,sub_name,sum(amount),count(*) as cou
> from mytable group by year,month,day,province,sub_name;
>
>
> 后面有几十张实时报表依赖这个流 再进行汇总 计算;
> 我现在是把这个轻度汇总表转成了回撤流输出到了kafka里面,如果后面这几十张报表能够消费这个topic并转成table,就可以做后面到运算了。
>
>
> 不知道能不能转成这样到table?
>
>
>
>
>
>
> --原始邮件--
> 发件人:"godfrey he" 发送时间:2020年6月3日(星期三) 下午3:40
> 收件人:"user-zh"
> 主题:Re: flink 1.9 关于回撤流的问题
>
>
>
> hi star,
> Flink 1.11 开始已经支持 table source 读取 retract 消息,update 消息。
> 目前支持 Debezium format,Canal format [1],其他的情况目前需要自己实现。
>
>
> Best,
> Godfrey
>
> [1]
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat
>
>
> 1048262223 <1048262...@qq.com 于2020年6月3日周三 下午2:59写道:
>
>  Hi
>  Flink 中RetractStream
> 
> 是必须要sink支持update的,kafka消息队列本身是不支持update的,所以基于sink为kafka的程序是不能做RetractStream的。
> 
> 
>  Best,
>  Yichao Yang
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"star"<3149768...@qq.comgt;;
>  发送时间:nbsp;2020年6月3日(星期三) 下午2:47
>  收件人:nbsp;"user-zh@flink.apache.org" gt;;
> 
>  主题:nbsp;flink 1.9 关于回撤流的问题
> 
> 
> 
>  大家好,
> 
> 
> 
> 
> 在做实时统计分析的时候我将基础汇总层做成了一个回撤流(toRetractStream)输出到kafka里(因为后面很多实时报表依赖这个流,所以就输出了)
>  问题是这个kafka里到回撤流还能转成flink 的RetractStream流吗,转成后我想注册成一张表,然后基于这个表再做聚合分析
> 
> 
> 
> 
>  谢谢


Re: Re:回复:flink1.9,如何实时查看kafka消费的挤压量

2020-06-03 文章 LakeShen
或者可以通过 Kafka-Manager 来查看

guanyq  于2020年6月3日周三 下午4:45写道:

>
>
>
> 找到了,原生就有的committedOffsets-currentOffsets
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter
> Connectors
> Kafka Connectors
> | Scope | Metrics | User Variables | Description | Type |
> | Operator | commitsSucceeded | n/a | The total number of successful
> offset commits to Kafka, if offset committing is turned on and
> checkpointing is enabled. | Counter |
> | Operator | commitsFailed | n/a | The total number of offset commit
> failures to Kafka, if offset committing is turned on and checkpointing is
> enabled. Note that committing offsets back to Kafka is only a means to
> expose consumer progress, so a commit failure does not affect the integrity
> of Flink's checkpointed partition offsets. | Counter |
> | Operator | committedOffsets | topic, partition | The last successfully
> committed offsets to Kafka, for each partition. A particular partition's
> metric can be specified by topic name and partition id. | Gauge |
> | Operator | currentOffsets | topic, partition | The consumer's current
> read offset, for each partition. A particular partition's metric can be
> specified by topic name and partition id. | Gauge |
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:02:24,"guanyq"  写道:
> >kafka挤压量的metrics的demo有么,或者参考资料
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-06-03 14:31:56,"1530130567" <1530130...@qq.com> 写道:
> >>Hi:
> >>  可以考虑用prometheus采集kafka的metrics,在grafana上展示
> >>
> >>
> >>
> >>
> >>--原始邮件--
> >>发件人: "Zhonghan Tang"<13122260...@163.com;
> >>发送时间: 2020年6月3日(星期三) 下午2:29
> >>收件人: "user-zh" >>抄送: "user-zh" >>主题: 回复:flink1.9,如何实时查看kafka消费的挤压量
> >>
> >>
> >>
> >>一般是kafka自带的查看消费组的命令工具可以看
> >>./kafka-consumer-groups.sh --describe --group test-consumer-group
> --bootstrap-server
> >>
> >>
> >>| |
> >>Zhonghan Tang
> >>|
> >>|
> >>13122260...@163.com
> >>|
> >>签名由网易邮箱大师定制
> >>
> >>
> >>在2020年06月3日 14:10,guanyq >>请加个问题
> >>
> >>1.消费kafka时,是如何实时查看kafka topic的挤压量的?
>


回复:转发:flink1.10整合hbase测试遇到的问题

2020-06-03 文章 Lijie Wang
这个是因为这个 class 不在路径中导致的。你需要确认一下这个 class 在哪个 jar 包中,这个 jar 包是否在 flink 的 lib 下




在2020年06月3日 22:52,liunaihua521 写道:





- 转发邮件信息 -

发件人: liunaihua521 
发送日期: 2020年6月3日 22:18
发送至: user-zh-i...@flink.apache.org  、 
user-zh-...@flink.apache.org 
主题: flink1.10整合hbase测试遇到的问题
hi!
版本说明:
flink版本1.10
HBase版本2.2.4
ZK版本3.6.1
Hadoop版本2.10.0


程序说明:

程序是简单的实现RichSourceFunction和RichSinkFunction,读取和写入hbase,程序打包后上传standalone模式的集群.


报错说明:
提交任务后,总是报如下错误(附件附文本):

或者



尝试如下:
尝试一:
flink的lib下有如下jar包:
提交的jar包中发现没有下面连个类
执行后报错


尝试二:
将guava-11.0.2.jar包移动到hadoop的lib下,再次执行依然报错


尝试结果:
反复尝试都一致报错,求大神们指点,再此先谢谢了!


回复:转发:flink1.10整合hbase测试遇到的问题

2020-06-03 文章 1048262223
Hi


java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.ClassNotFoundException: 
org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader
根据报错看,这个guava的cacheloader应该是被shade到hbase的jar包中的,你可以查看下hbase相关的jar包有没有shade进去guava的这些类。
Best,Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: liunaihua521 

回复:flink sql 窗口场景的问题

2020-06-03 文章 Sun.Zhu
hi
你是要每条数据都计算当前5分钟内的聚合值吗?如果是这样的话可以考虑使用over window


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月3日 02:56,steven chen 写道:
hi :
我现在遇到有这样一个场景,我们需要实时去统计5分和30分的粒度,flink sql 窗口使用了处理时间滑动窗口方式
但是都是只有5分结束的时候才能把聚合结果输出,这个不满足我们需求,有没有方式可以直接实时输出结果,比如18:02 的统计+1+1 
都能直接落在18:00-18:05的窗口上,并每次+1都能实时输出,而不是等到窗口结束才sink 到mysql .30分钟我同样

Re: 回复: flink数据sink到mysql 是事务处理

2020-06-03 文章 godfrey he
hi greemqq...@163.com,15701181132mr@gmail.com
能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等

hi hdxg1101300...@163.com,
你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink
sql,还是datastream?

Bests,
Godfrey

Bests,
Godfrey

Michael Ran  于2020年6月3日周三 下午8:07写道:

> 我们也会用幂等处理类似的东西。
> 1.你要么单条数据处理
> 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:41:44,"1048262223" <1048262...@qq.com> 写道:
> >Hi
> >  
> 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
> >
> >
> >Best,
> >Yichao Yang
> >
> >
> >
> >
> >--原始邮件--
> >发件人:"Px New"<15701181132mr@gmail.com;
> >发送时间:2020年6月3日(星期三) 中午11:35
> >收件人:"user-zh" >
> >主题:Re: flink数据sink到mysql 是事务处理
> >
> >
> >
> >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
> >也可发code到我的email 15701181132mr@gmail.com
> >
> >1101300123  >
> >
> >
> >
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> > 多条记录的操作
> > sink的invoke代码
> > @Override
> > public void invoke(Tuple5 > List > connection.setAutoCommit(false);
> > List > for (BroadBandReq rs: f4){
> > statement.setString(1,rs.getUserId());
> > statement.setString(2,rs.getPhoneNum());
> > statement.setString(3,rs.getProvId());
> > statement.addBatch();
> > }
> > try {
> > statement.executeBatch();
> > connection.commit();
> > }catch (Exception e){
> > LOG.info(" add data for rds ; operTag:{},
> > userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0,
> value.f1,
> > value.f2, value.f3,f4);
> > connection.rollback();
> > e.printStackTrace();
> >
> throw new Exception(e);
> > }
> > }
> >
> >
> >
> >
> > java.lang.Exception: java.sql.BatchUpdateException: Deadlock found
> when
> > trying to get lock; try restarting transaction
> > at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
> > at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
> > at
> >
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> > at
> >
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
> > at
> >
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
> > at
> >
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> > at
> org.apache.flink.streaming.runtime.io
> >
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> > at
> org.apache.flink.streaming.runtime.io
> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> > at
> org.apache.flink.streaming.runtime.io
> >
> 

Re: 在yarn-session模式下怎么用rest api 触发savepoint并停止任务

2020-06-03 文章 Weihua Hu
HI, Junbao

 可以参考 API 文档检查一下 HTTP method 是否正确
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop
 



Best
Weihua Hu

> 2020年6月1日 16:21,wind.fly@outlook.com 写道:
> 
> Hi,all:
> 本人当前用的flink版本1.10,通过yarn-session发布job,通过jobs/job/stop api 停止任务会报Unable to 
> load requested file,问一下在yarn-session模式下没有这个api吗?
> 
> Best,
> Junbao Zhang



Re:回复: flink数据sink到mysql 是事务处理

2020-06-03 文章 Michael Ran
我们也会用幂等处理类似的东西。
1.你要么单条数据处理
2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的

















在 2020-06-03 15:41:44,"1048262223" <1048262...@qq.com> 写道:
>Hi
>   
>可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
>
>
>Best,
>Yichao Yang
>
>
>
>
>--原始邮件--
>发件人:"Px New"<15701181132mr@gmail.com;
>发送时间:2020年6月3日(星期三) 中午11:35
>收件人:"user-zh"
>主题:Re: flink数据sink到mysql 是事务处理
>
>
>
>Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
>也可发code到我的email 15701181132mr@gmail.com
>
>1101300123 
>
>
> 
>目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> 多条记录的操作
> sink的invoke代码
> @Override
> public void invoke(Tuple5 List connection.setAutoCommit(false);
> List for (BroadBandReq rs: f4){
> statement.setString(1,rs.getUserId());
> statement.setString(2,rs.getPhoneNum());
> statement.setString(3,rs.getProvId());
> statement.addBatch();
> }
> try {
> statement.executeBatch();
> connection.commit();
> }catch (Exception e){
> LOG.info(" add data for rds ; operTag:{},
> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1,
> value.f2, value.f3,f4);
> connection.rollback();
> e.printStackTrace();
> 
>throw new Exception(e);
> }
> }
>
>
>
>
> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when
> trying to get lock; try restarting transaction
> at 
>com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
> at 
>com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
> at
> 
>org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at
> 
>org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
> at
> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
> at
> 
>org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> at
> 
>com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
> at
> 
>com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
> at
> 
>org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> 
>org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at
> 
>org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> 
>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> at 
>org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> at 
>org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> at 
>org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> at
> 
>org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> at
> 
>org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at
> 
>org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at 
>org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> 

Re:flink sink to mysql

2020-06-03 文章 chaojianok
推荐 JDBCOutputFormat 吧,简单易用。

















在 2020-06-03 18:11:38,"Zhou Zach"  写道:
>hi all,
> flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?


Re:Re: pyflink window创建失败

2020-06-03 文章 元灵



谢谢回复,等1.11.0出来我再试试。














在 2020-06-03 18:00:06,"godfrey he"  写道:
> hi 元灵,
> 这个是个已知bug: https://issues.apache.org/jira/browse/FLINK-17753,目前正在fix。
>
>Bests,
>Godfrey
>
>元灵  于2020年6月3日周三 下午5:39写道:
>
>> 大家好,请教个问题:
>> 我在pyflink中使用SQL DDL创建kafka源,如下:
>> kafka_source_ddl = """
>> CREATE TABLE kafka_source_tb (
>>  name VARCHAR,
>>  number INT,
>>  msgtime TIMESTAMP,
>>  WATERMARK FOR msgtime AS msgtime
>> ) WITH (
>>  'connector.type' = 'kafka',
>>  'connector.version' = 'universal',
>>  'connector.topic' = 'mytopic',
>>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>>  'format.type' = 'json',
>>  'format.derive-schema' = 'true'
>> )
>> """
>> st_env.sql_update(kafka_source_ddl)
>>
>>
>> 在使用窗口时报错,代码如下:
>> st_env.from_path("kafka_source_tb") \
>>
>> .window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime"))
>> \
>>   .group_by("msgtime") \
>>   .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime
>> as d") \
>>
>>
>> 报错如下
>> : org.apache.flink.table.api.ValidationException: A group window expects a
>> time attribute for grouping in a stream environment.
>> at
>> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
>> at
>> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
>> at
>> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
>> at
>> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
>> at
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
>> at
>> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
>> at
>> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
>>
>>
>> 我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了?
>>
>>
>> 请大家帮忙看一下
>> 谢谢!
>>
>>


flink sink to mysql

2020-06-03 文章 Zhou Zach
hi all,
 flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?

Re: pyflink window创建失败

2020-06-03 文章 godfrey he
 hi 元灵,
 这个是个已知bug: https://issues.apache.org/jira/browse/FLINK-17753,目前正在fix。

Bests,
Godfrey

元灵  于2020年6月3日周三 下午5:39写道:

> 大家好,请教个问题:
> 我在pyflink中使用SQL DDL创建kafka源,如下:
> kafka_source_ddl = """
> CREATE TABLE kafka_source_tb (
>  name VARCHAR,
>  number INT,
>  msgtime TIMESTAMP,
>  WATERMARK FOR msgtime AS msgtime
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',
>  'connector.topic' = 'mytopic',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>  'format.type' = 'json',
>  'format.derive-schema' = 'true'
> )
> """
> st_env.sql_update(kafka_source_ddl)
>
>
> 在使用窗口时报错,代码如下:
> st_env.from_path("kafka_source_tb") \
>
> .window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime"))
> \
>   .group_by("msgtime") \
>   .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime
> as d") \
>
>
> 报错如下
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
>
>
> 我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了?
>
>
> 请大家帮忙看一下
> 谢谢!
>
>


pyflink window创建失败

2020-06-03 文章 元灵
大家好,请教个问题:
我在pyflink中使用SQL DDL创建kafka源,如下:
kafka_source_ddl = """
CREATE TABLE kafka_source_tb (
 name VARCHAR,
 number INT,
 msgtime TIMESTAMP,
 WATERMARK FOR msgtime AS msgtime
) WITH (
 'connector.type' = 'kafka',
 'connector.version' = 'universal',
 'connector.topic' = 'mytopic',
 'connector.properties.zookeeper.connect' = 'localhost:2181',
 'connector.properties.bootstrap.servers' = 'localhost:9092',
 'format.type' = 'json',
 'format.derive-schema' = 'true'
)
"""
st_env.sql_update(kafka_source_ddl)


在使用窗口时报错,代码如下:
st_env.from_path("kafka_source_tb") \
  
.window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime"))
 \
  .group_by("msgtime") \
  .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime as d") 
\


报错如下
: org.apache.flink.table.api.ValidationException: A group window expects a time 
attribute for grouping in a stream environment.
at 
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at 
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at 
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at 
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)


我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了?


请大家帮忙看一下
谢谢!



Re:Re:回复:flink1.9,如何实时查看kafka消费的挤压量

2020-06-03 文章 guanyq



找到了,原生就有的committedOffsets-currentOffsets
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter
Connectors
Kafka Connectors
| Scope | Metrics | User Variables | Description | Type |
| Operator | commitsSucceeded | n/a | The total number of successful offset 
commits to Kafka, if offset committing is turned on and checkpointing is 
enabled. | Counter |
| Operator | commitsFailed | n/a | The total number of offset commit failures 
to Kafka, if offset committing is turned on and checkpointing is enabled. Note 
that committing offsets back to Kafka is only a means to expose consumer 
progress, so a commit failure does not affect the integrity of Flink's 
checkpointed partition offsets. | Counter |
| Operator | committedOffsets | topic, partition | The last successfully 
committed offsets to Kafka, for each partition. A particular partition's metric 
can be specified by topic name and partition id. | Gauge |
| Operator | currentOffsets | topic, partition | The consumer's current read 
offset, for each partition. A particular partition's metric can be specified by 
topic name and partition id. | Gauge |











在 2020-06-03 15:02:24,"guanyq"  写道:
>kafka挤压量的metrics的demo有么,或者参考资料
>
>
>
>
>
>
>
>
>在 2020-06-03 14:31:56,"1530130567" <1530130...@qq.com> 写道:
>>Hi:
>>  可以考虑用prometheus采集kafka的metrics,在grafana上展示
>>
>>
>>
>>
>>--原始邮件--
>>发件人: "Zhonghan Tang"<13122260...@163.com; 
>>发送时间: 2020年6月3日(星期三) 下午2:29
>>收件人: "user-zh">抄送: "user-zh">主题: 回复:flink1.9,如何实时查看kafka消费的挤压量
>>
>>
>>
>>一般是kafka自带的查看消费组的命令工具可以看
>>./kafka-consumer-groups.sh --describe --group test-consumer-group 
>>--bootstrap-server 
>>
>>
>>| |
>>Zhonghan Tang
>>|
>>|
>>13122260...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2020年06月3日 14:10,guanyq>请加个问题
>>
>>1.消费kafka时,是如何实时查看kafka topic的挤压量的?


?????? flink 1.9 ????????????????

2020-06-03 文章 star


??select year,month,day,province,sub_name,sum(amount),count(*) as cou from 
mytable group by year,month,day,province,sub_name;


?? ?? ??
kafka??topic??table


??table??






----
??:"godfrey he"https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat


1048262223 <1048262...@qq.com ??2020??6??3?? 2:59??

 Hi
 Flink ??RetractStream
 
sinkupdatekafkaupdatesink??kafka??RetractStream


 Best,
 Yichao Yang


 --nbsp;nbsp;--
 ??:nbsp;"star"<3149768...@qq.comgt;;
 :nbsp;2020??6??3??(??) 2:47
 
??:nbsp;"user-zh@flink.apache.org"

回复: flink数据sink到mysql 是事务处理

2020-06-03 文章 1048262223
Hi
   
可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。


Best,
Yichao Yang




--原始邮件--
发件人:"Px New"<15701181132mr@gmail.com;
发送时间:2020年6月3日(星期三) 中午11:35
收件人:"user-zh"

Re: flink 1.9 关于回撤流的问题

2020-06-03 文章 godfrey he
hi star,
Flink 1.11 开始已经支持 table source 读取 retract 消息,update 消息。
目前支持 Debezium format,Canal format [1],其他的情况目前需要自己实现。


Best,
Godfrey

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat


1048262223 <1048262...@qq.com> 于2020年6月3日周三 下午2:59写道:

> Hi
> Flink 中RetractStream
> 是必须要sink支持update的,kafka消息队列本身是不支持update的,所以基于sink为kafka的程序是不能做RetractStream的。
>
>
> Best,
> Yichao Yang
>
>
> --原始邮件--
> 发件人:"star"<3149768...@qq.com;
> 发送时间:2020年6月3日(星期三) 下午2:47
> 收件人:"user-zh@flink.apache.org"
> 主题:flink 1.9 关于回撤流的问题
>
>
>
> 大家好,
>
>
>
> 在做实时统计分析的时候我将基础汇总层做成了一个回撤流(toRetractStream)输出到kafka里(因为后面很多实时报表依赖这个流,所以就输出了)
> 问题是这个kafka里到回撤流还能转成flink 的RetractStream流吗,转成后我想注册成一张表,然后基于这个表再做聚合分析
>
>
>
>
> 谢谢


Re:回复:flink1.9,如何实时查看kafka消费的挤压量

2020-06-03 文章 guanyq
kafka挤压量的metrics的demo有么,或者参考资料








在 2020-06-03 14:31:56,"1530130567" <1530130...@qq.com> 写道:
>Hi:
>  可以考虑用prometheus采集kafka的metrics,在grafana上展示
>
>
>
>
>--原始邮件--
>发件人: "Zhonghan Tang"<13122260...@163.com; 
>发送时间: 2020年6月3日(星期三) 下午2:29
>收件人: "user-zh"抄送: "user-zh"主题: 回复:flink1.9,如何实时查看kafka消费的挤压量
>
>
>
>一般是kafka自带的查看消费组的命令工具可以看
>./kafka-consumer-groups.sh --describe --group test-consumer-group 
>--bootstrap-server 
>
>
>| |
>Zhonghan Tang
>|
>|
>13122260...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2020年06月3日 14:10,guanyq请加个问题
>
>1.消费kafka时,是如何实时查看kafka topic的挤压量的?


??????flink 1.9 ????????????????

2020-06-03 文章 1048262223
Hi
Flink ??RetractStream 
sinkupdatekafkaupdatesink??kafka??RetractStream


Best,
Yichao Yang


----
??:"star"<3149768...@qq.com;
:2020??6??3??(??) 2:47
??:"user-zh@flink.apache.org"

flink 1.9 ????????????????

2020-06-03 文章 star



??toRetractStreamkafka??
??kafka??flink 
??RetractStream






??????flink1.9??????????????kafka????????????

2020-06-03 文章 1530130567
Hi:
  ??prometheuskafka??metricsgrafana??




----
??: "Zhonghan Tang"<13122260...@163.com; 
: 2020??6??3??(??) 2:29
??: "user-zh"

回复:flink1.9,如何实时查看kafka消费的挤压量

2020-06-03 文章 Zhonghan Tang
一般是kafka自带的查看消费组的命令工具可以看
./kafka-consumer-groups.sh --describe --group test-consumer-group  
--bootstrap-server 


| |
Zhonghan Tang
|
|
13122260...@163.com
|
签名由网易邮箱大师定制


在2020年06月3日 14:10,guanyq 写道:
请加个问题

1.消费kafka时,是如何实时查看kafka topic的挤压量的?

flink1.9,如何实时查看kafka消费的挤压量

2020-06-03 文章 guanyq
请加个问题

1.消费kafka时,是如何实时查看kafka topic的挤压量的?