Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-03 文章 zhisheng
我猜你是想要将 table name 作为一个标签方便后期分组查询过滤?

wangl...@geekplus.com.cn  于2020年7月3日周五 上午10:24写道:

> public void invoke(ObjectNode node, Context context) throws Exception {
>
> String tableName = node.get("metadata").get("topic").asText();
> Meter meter = getRuntimeContext().getMetricGroup().meter(tableName,
> new MeterView(10));
> meter.markEvent();
> log.info("### counter: " + meter.toString() + "\t" +
> meter.getCount());
>
> 如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics.
> 但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: kcz
> Send Time: 2020-07-03 09:13
> Receiver: wanglei2
> Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
> 按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧
>
>
>
> -- 原始邮件 --
> 发件人: 王磊2 
> 发送时间: 2020年7月2日 21:46
> 收件人: user-zh , 17610775726 <17610775...@163.com>
> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
>
>
> 没有明白你说的实现方式。
>
> 我最终要得到类似的 Metrics:  myCounter_table1, myCounter_table2, ...,
> myCounter_tableX
> 但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。
>
> 谢谢,
> 王磊
>
>
>
> --
> 发件人:JasonLee <17610775...@163.com>
> 发送时间:2020年7月2日(星期四) 21:12
> 收件人:user-zh 
> 主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
>
> 你把tablename传到下面metric里不就行了吗
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道:
>
> 全都是同一种类型的 metrics.
> 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的
> metrics(但都是 meter 类型)
>
> 谢谢,
> 王磊
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> 发件人: JasonLee
> 发送时间: 2020-07-02 16:16
> 收件人: user-zh
> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
> 是要生成不同类型的metric吗 比如counter meter ?
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:
>
> 官网上的例子:
>
> public class MyMapper extends RichMapFunction {
> private transient Counter counter;
> @Override
> public void open(Configuration config) {
>   this.counter = getRuntimeContext()
> .getMetricGroup()
> .counter("myCounter");
> }
> @Override
> public String map(String value) throws Exception {
>   this.counter.inc();
>   return value;
> }
> }
>
> 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
>


Re: Flink job不定期就会重启,版本是1.9

2020-07-03 文章 zhisheng
我们集群一般出现这种异常大都是因为 Full GC 次数比较多,然后最后伴随着就是 TaskManager 挂掉的异常

Xintong Song  于2020年7月3日周五 上午11:06写道:

> 从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。
> 建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jul 3, 2020 at 10:48 AM noon cjihg  wrote:
>
> > Hi,大佬们
> >
> > Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗?
> >
> > 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO
> > akka.remote.RemoteActorRefProvider$RemotingTerminator
> > flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down.
> > 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO
> > akka.remote.RemoteActorRefProvider$RemotingTerminator
> > flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down.
> > 2020-07-01 20:20:43.875 [flink-metrics-16] INFO
> > akka.remote.RemoteActorRefProvider$RemotingTerminator
> > flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut
> > down.
> > 2020-07-01 20:20:43.875 [flink-metrics-16] INFO
> > akka.remote.RemoteActorRefProvider$RemotingTerminator
> > flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut
> > down.
> > 2020-07-01 20:20:43.891 [flink-metrics-16] INFO
> > org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped Akka RPC
> > service.
> > 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Terminating
> > cluster entrypoint process YarnJobClusterEntrypoint with exit code 2.
> > java.util.concurrent.CompletionException:
> > akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka://flink/user/resourcemanager#-781959047]] after [1
> > ms]. Message of type
> > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> > reason for `AskTimeoutException` is that the recipient actor didn't
> > send a reply.
> > at
> >
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> > at
> >
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> > at
> >
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> > at
> >
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> > at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > at
> >
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
> > at akka.dispatch.OnComplete.internal(Future.scala:263)
> > at akka.dispatch.OnComplete.internal(Future.scala:261)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at
> >
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> > at
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > at
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> > at
> >
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka://flink/user/resourcemanager#-781959047]] after [1
> > ms]. Message of type
> > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> > reason for `AskTimeoutException` is that the recipient actor didn't
> > send a reply.
> > at
> > akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> > at
> > akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> > ... 9 common frames omitted
> >
>


Re: 如何在窗口关闭的时候清除状态

2020-07-03 文章 zhisheng
你试试在 clear 方法中清理

18579099...@163.com <18579099...@163.com> 于2020年7月3日周五 下午2:02写道:

>
> 大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢?
>
> 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。
>
> 如果我在process方法中清理,每隔一个小时就会被清理,而valueState中存的是我的中间结果,应该在窗口关闭的时候被清理(即一天之后)。这应该怎么办呢?
>
>
>
> 18579099...@163.com
>


Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-03 文章 zhisheng
我们也有遇到过这个异常,但是不是很常见

Congxian Qiu  于2020年7月3日周五 下午2:08写道:

> 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> [1]  https://issues.apache.org/jira/browse/FLINK-17479
> Best,
> Congxian
>
>
> 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
>
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> >
> >
> >
> >
> >
> > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> > >你到具体的tm上找到相关的operator看看是不是有异常信息
> > >
> > >
> > >| |
> > >JasonLee
> > >|
> > >|
> > >邮箱:17610775...@163.com
> > >|
> > >
> > >Signature is customized by Netease Mail Master
> > >
> > >在2020年07月01日 20:43,程龙 写道:
> > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> > >
> > >
> > >java.lang.Exception: Could not perform checkpoint 3201 for operator
> > Filter -> Map (2/8).
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> > >   at org.apache.flink.streaming.runtime.io
> >
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> > >   at org.apache.flink.streaming.runtime.io
> >
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> > >   at org.apache.flink.streaming.runtime.io
> > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> > >   at org.apache.flink.streaming.runtime.io
> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> > >   at org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> > >   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> > >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> > >   at java.lang.Thread.run(Thread.java:745)
> > >Caused by: java.lang.NullPointerException
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
> >
>


回复:flink1.9读取阿里Mq问题

2020-07-03 文章 李军
您好!
自定义source继承RichSourceFuntion.open() 里去构建Conumer 可以设置AccessKey,SecretKey 
参数;



2020-7-4
| |
李军
|
|
hold_li...@163.com
|
签名由网易邮箱大师定制
在2020年7月3日 23:44,guanyq 写道:
flink1.9读取阿里RocketMQ
如何设置AccessKey,SecretKey 参数


finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)build();

Re: flink1.9读取阿里Mq问题

2020-07-03 文章 zhisheng
hi,guanyq

社区版本的 Flink 应该默认没有和 RocketMQ 连接的 Connector,在 RocketMQ 的社区项目中看到和 Flink 整合的模块:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

你说的 AccessKey,SecretKey 参数应该是 ACL 权限校验,看了代码应该是不支持的,不过可以自己去进行扩展。

Best!
zhisheng

guanyq  于2020年7月3日周五 下午11:44写道:

> flink1.9读取阿里RocketMQ
> 如何设置AccessKey,SecretKey 参数
>
>
>
> finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)build();


回复: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 Sun.Zhu
感谢benchao和forideal的方案,
方法1.使用udf,查不到 sleep 等一下在查
--这个可以尝试
方法2.在 join operator处数据等一会再去查
—我们使用的是flink sql,不是streaming,所以该方案可能行不通
方法3.如果没有 join 上,就把数据发到source,循环join。
--我们这个维表join的场景类似filter的功能,如果关联上则主流数据就不处理了,所以不一定非要join上,只是想延迟一会提升准确率
方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了
—我们的source是kafka,好像不支持kafka的功能
方法5.扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 
的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。
--这个方案需要修改源码,也可以试一下


Best
Sun.Zhu
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年07月3日 23:26,forideal 写道:
Hi




刚刚本超说了四种方法,

方法1.使用udf,查不到 sleep 等一下在查

方法2.在 join operator处数据等一会再去查

方法3.如果没有 join 上,就把数据发到source,循环join。

方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了




上述方法应该都能实现相同的效果。




我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 
的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。




Best forideal

















在 2020-07-03 23:05:06,"Benchao Li"  写道:
奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。

admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:

Hi,all
我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
FLink sql有什么方案实现吗?

感谢您的回复



--

Best,
Benchao Li


flink1.9读取阿里Mq问题

2020-07-03 文章 guanyq
flink1.9读取阿里RocketMQ
如何设置AccessKey,SecretKey 参数


finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)build();

Re:Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 forideal
Hi 




刚刚本超说了四种方法,

方法1.使用udf,查不到 sleep 等一下在查

方法2.在 join operator处数据等一会再去查

方法3.如果没有 join 上,就把数据发到source,循环join。

方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了




上述方法应该都能实现相同的效果。




我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 
的属性,当用户设置了这个属性,就让source 的数据等一会儿发到下游。起到等一会的效果。




Best forideal

















在 2020-07-03 23:05:06,"Benchao Li"  写道:
>奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。
>
>admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:
>
>> Hi,all
>> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
>> FLink sql有什么方案实现吗?
>>
>> 感谢您的回复
>
>
>
>-- 
>
>Best,
>Benchao Li


Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 Benchao Li
奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。

admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:

> Hi,all
> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
> FLink sql有什么方案实现吗?
>
> 感谢您的回复



-- 

Best,
Benchao Li


Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 Benchao Li
还有一种很有意思的思路。
如果你不考虑数据是否会有乱序,而且保证维表中一定能join到结果,那就可以正常join,如果join不到,就把这条数据再发送到source的topic里,实现了一种类似于for循环的能力。。

admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:

> Hi,all
> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
> FLink sql有什么方案实现吗?
>
> 感谢您的回复



-- 

Best,
Benchao Li


Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 Benchao Li
我们也遇到过类似场景。
如果你的数据里面有事件时间,可以写个udf来判断下,如果事件时间-当前时间 小于某个阈值,可以sleep一下。
如果没有事件时间,那就不太好直接搞了,我们是自己搞了一个延迟维表,就是保证每条数据进到维表join算子后等固定时间后再去join。

admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:

> Hi,all
> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
> FLink sql有什么方案实现吗?
>
> 感谢您的回复



-- 

Best,
Benchao Li


Re: table execution-options 能否通过 -yd 生效

2020-07-03 文章 Benchao Li
这个应该可以生效的,我们就是这样用的。

如果没理解错,在`PlannerBase#mergeParameters`会把ExecutionEnvironment中的参数和TableConfig的参数合并的。

Yang Wang  于2020年7月3日周五 下午5:10写道:

> 其实是没有Public的API去从文件load Configuration的,因为我理解这是个Client端的内部逻辑
>
> 在用户调用了flink run以后,client会把conf/flink-conf.yaml加载,并apply上dynamic options,
> 然后会把这个Configuration传给各个Environment去使用
>
> 如果TableEnvironment在构建的时候没有使用传过来的Configuration,那-yD就没有办法生效了
> 只能用户在代码里面再设置一次
>
>
> Best,
> Yang
>
> Jingsong Li  于2020年7月3日周五 下午3:19写道:
>
> > Hi,
> >
> > 如果你是写代码来使用TableEnvironment的,
> > 你要显示的在代码中塞进TableConfig中:
> >
> > Configuration configuration = tEnv.getConfig().getConfiguration();
> > configuration.addAll(GlobalConfiguration.loadConfiguration());
> >
> > CC: @Yang Wang 
> GlobalConfiguration是个internal的类,有没有public
> > API获取对应的Configuration?
> >
> > Best,
> > Jingsong
> >
> > On Fri, Jul 3, 2020 at 3:07 PM Yang Wang  wrote:
> >
> >> 我理解在Yarn上运行,通过-yD传入和写在flink-conf.yaml里面都是可以生效的
> >>
> >>
> >> Best,
> >> Yang
> >>
> >> liangji  于2020年7月2日周四 下午6:12写道:
> >>
> >> >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html#execution-options
> >> > // instantiate table environment
> >> > TableEnvironment tEnv = ...
> >> >
> >> > // access flink configuration
> >> > Configuration configuration = tEnv.getConfig().getConfiguration();
> >> > // set low-level key-value options
> >> > configuration.setString("table.exec.mini-batch.enabled", "true");
> >> > configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
> >> > configuration.setString("table.exec.mini-batch.size", "5000");
> >> >
> >> > 请问下,table的这些参数是不是只能在代码里面设置,通过 -yd 传入可否生效呢?
> >> >
> >> >
> >> >
> >> > --
> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >> >
> >>
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 

Best,
Benchao Li


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-07-03 文章 Benchao Li
你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
这个已经在1.11中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-17942

x <35907...@qq.com> 于2020年7月3日周五 下午4:34写道:

> 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
>
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
>
>
>
>
> -- 原始邮件 --
> 发件人: "Jark Wu" 发送时间: 2020年6月18日(星期四) 中午12:16
> 收件人: "user-zh"
> 主题: Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 是的,我觉得这样子是能绕过的。
>
> On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.com> wrote:
>
> > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> > val resTmpTab: Table = tabEnv.sqlQuery(
> >   """
> > SELECT MAX(DATE_FORMAT(ts, '-MM-dd
> HH:mm:00'))
> > time_str,COUNT(DISTINCT userkey) uv
> > FROM user_behavior    GROUP BY
> DATE_FORMAT(ts, '-MM-dd')    """)
> >
> > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
> >  
> .filter(line=>line._1==true).map(line=>line._2)
> >
> > val res= tabEnv.fromDataStream(resTmpStream)
> > tabEnv.sqlUpdate(
> >   s"""
> > INSERT INTO rt_totaluv
> > SELECT _1,MAX(_2)
> > FROM $res
> > GROUP BY _1
> > """)
> >
> >
> > -- 原始邮件 --
> > 发件人: "Jark Wu" > 发送时间: 2020年6月17日(星期三) 中午1:55
> > 收件人: "user-zh" >
> > 主题: Re: 求助:FLINKSQL1.10实时统计累计UV
> >
> >
> >
> > 在 Flink 1.11 中,你可以尝试这样:
> >
> > CREATE TABLE mysql (
> >    time_str STRING,
> >    uv BIGINT,
> >    PRIMARY KEY (ts) NOT ENFORCED
> > ) WITH (
> >    'connector' = 'jdbc',
> >    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
> >    'table-name' = 'myuv'
> > );
> >
> > INSERT INTO mysql
> > SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')),
> COUNT(DISTINCT 
> > user_id)
> > FROM user_behavior;
> >
> > On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com> wrote:
> >
> > > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> > > sink表这个样式
> > > tm uv
> > > 2020/06/17 13:46:00 1
> > > 2020/06/17 13:47:00 2
> > > 2020/06/17 13:48:00 3
> > >
> > >
> > > group by 日期的话,分钟如何获取
> > >
> > >
> > >
> --&nbsp;原始邮件&nbsp;--
> > > 发件人:&nbsp;"Benchao Li" &gt;;
> > > 发送时间:&nbsp;2020年6月17日(星期三) 中午11:46
> > > 收件人:&nbsp;"user-zh" &gt;;
> > >
> > > 主题:&nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> > >
> > >
> > >
> > > Hi,
> > > 我感觉这种场景可以有两种方式,
> > > 1. 可以直接用group by + mini batch
> > > 2. window聚合 + fast emit
> > >
> > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
> '-MM-dd')。
> > > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini
> batch的开启也需要
> > > 用参数[2] 来打开。
> > >
> > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> > > fast
> emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> > > table.exec.emit.early-fire.enabled = true
> > > table.exec.emit.early-fire.delay = 60 s
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> > > [2]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> > >
> > > x <35907...@qq.com&gt; 于2020年6月17日周三 上午11:14写道:
> > >
> > > &gt;
> 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > > &gt; CREATE VIEW uv_per_10min AS
> > > &gt; SELECT&amp;nbsp;
> > > &gt; &amp;nbsp;
> MAX(DATE_FORMAT(proctime&amp;nbsp;,
> > '-MM-dd
> > > HH:mm:00'))&amp;nbsp;OVER w
> > > &gt; AS time_str,&amp;nbsp;
> > > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER
> w AS uv
> > > &gt; FROM user_behavior
> > > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN
> UNBOUNDED
> > PRECEDING AND
> > > &gt; CURRENT ROW);
> > > &gt;
> > > &gt;
> > > &gt; 想请教一下,应该如何处理?
> > > &gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd')
> > 这样可以吗,另外状态应该如何清理?
> > > &gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > > &gt; 多谢



-- 

Best,
Benchao Li


回复:Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 Sun.Zhu


窗口得用group by,字段会丢失
在2020年07月03日 19:11,kcz 写道:
设置一个窗口时间,如果有需要取最新的,可以再做一下处理。





-- 原始邮件 --
发件人: admin <17626017...@163.com>
发送时间: 2020年7月3日 18:01
收件人: user-zh 

flink1.9自定义实现source的问题

2020-07-03 文章 guanyq
附件图片,想把listener出来的数据,传给ctx。
如何实现这个数据的传递。
public class RMQRichParallelSource extends RichParallelSourceFunction 
implements MessageOrderListener {

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Properties properties = new Properties();

// 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

consumer.subscribe(
"PRODNOC_KB_SYNC_CUST_ORDER",
"*",
this);
consumer.start();
}

@Override
public void run(SourceContext ctx) {


}

@Override
public OrderAction consume(Message message, ConsumeOrderContext 
consumeOrderContext) {
try {
System.out.println(new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return OrderAction.Success;
}

@Override
public void cancel() {
}

@Override
public void close() throws Exception {
super.close();
}
}

Re:Re:flink asynctablefunction调用异常

2020-07-03 文章 sunfulin



hi
抱歉忘记回复了。经过进一步调试发现,是因为定义的schema的column类型,与实际获取到的字段类型不一致导致。主要是在调试的过程中,ComplettedFuture.complete会吃掉这种类型不一致的异常,也不下发数据。看源码发现只会在timeout的时候才调用future.completeException。记录下。














在 2020-07-03 17:01:19,"forideal"  写道:
>Hi sunfulin:
> 
>  我这么实现是可以的。
>public void eval(CompletableFuture> result, String key) {
>executorService.submit(() -> {
>try {
>Row row = fetchdata(key);
>if (row != null) {
>result.complete(Collections.singletonList(row));
>} else {
>result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
>}
>} catch (Exception e) {
>result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
>}
>});
>}
>
>
>
>
>Best forideal.
>
>
>
>
>
>在 2020-07-02 15:56:46,"sunfulin"  写道:
>>hi,
>>我在使用flink 1.10.1 blink 
>>planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
>>遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : 
>>java.lang.Exception: Could not complete the stream element: 
>>org.apache.flink.table.dataformat.BinaryRow  caused by : 
>>java.util.concurrent.TimeoutException: Async function call has timed out.
>>
>>
>>我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。


Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 文章 Yun Tang
hi

有采集过内存使用情况么,推荐使用jemalloc的预先加载方式[1][2]来sample 
JVM的内存使用,观察是否有malloc的内存存在超用的场景。需要配置相关参数 
containerized.taskmanager.env.MALLOC_CONF 和 
containerized.taskmanager.env.LD_PRELOAD


[1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
[2] https://www.evanjones.ca/java-native-leak-bug.html

祝好
唐云



From: SmileSmile 
Sent: Friday, July 3, 2020 15:22
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

Hi

作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。

【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】
我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。

详情可见
http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 15:13,Yun Tang 写道:
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?




;
[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
a511955993
邮箱:a511955...@163.com

签名由 网易邮箱大师; 定制

在2020年07月03日 14:59,Yun Tang 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


回复:Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 kcz
设置一个窗口时间,如果有需要取最新的,可以再做一下处理。





-- 原始邮件 --
发件人: admin <17626017...@163.com>
发送时间: 2020年7月3日 18:01
收件人: user-zh 

Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 admin
补充一下:明确的说是维表的join,A表关联B表(维表),想让A表延迟一会再关联B表

> 2020年7月3日 下午5:53,admin <17626017...@163.com> 写道:
> 
> Hi,all
> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
> FLink sql有什么方案实现吗?
> 
> 感谢您的回复



Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 admin
Hi,all
我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
FLink sql有什么方案实现吗?

感谢您的回复

Re: table execution-options 能否通过 -yd 生效

2020-07-03 文章 Yang Wang
其实是没有Public的API去从文件load Configuration的,因为我理解这是个Client端的内部逻辑

在用户调用了flink run以后,client会把conf/flink-conf.yaml加载,并apply上dynamic options,
然后会把这个Configuration传给各个Environment去使用

如果TableEnvironment在构建的时候没有使用传过来的Configuration,那-yD就没有办法生效了
只能用户在代码里面再设置一次


Best,
Yang

Jingsong Li  于2020年7月3日周五 下午3:19写道:

> Hi,
>
> 如果你是写代码来使用TableEnvironment的,
> 你要显示的在代码中塞进TableConfig中:
>
> Configuration configuration = tEnv.getConfig().getConfiguration();
> configuration.addAll(GlobalConfiguration.loadConfiguration());
>
> CC: @Yang Wang  
> GlobalConfiguration是个internal的类,有没有public
> API获取对应的Configuration?
>
> Best,
> Jingsong
>
> On Fri, Jul 3, 2020 at 3:07 PM Yang Wang  wrote:
>
>> 我理解在Yarn上运行,通过-yD传入和写在flink-conf.yaml里面都是可以生效的
>>
>>
>> Best,
>> Yang
>>
>> liangji  于2020年7月2日周四 下午6:12写道:
>>
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html#execution-options
>> > // instantiate table environment
>> > TableEnvironment tEnv = ...
>> >
>> > // access flink configuration
>> > Configuration configuration = tEnv.getConfig().getConfiguration();
>> > // set low-level key-value options
>> > configuration.setString("table.exec.mini-batch.enabled", "true");
>> > configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
>> > configuration.setString("table.exec.mini-batch.size", "5000");
>> >
>> > 请问下,table的这些参数是不是只能在代码里面设置,通过 -yd 传入可否生效呢?
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >
>>
>
>
> --
> Best, Jingsong Lee
>


Re:flink asynctablefunction调用异常

2020-07-03 文章 forideal
Hi sunfulin:
 
  我这么实现是可以的。
public void eval(CompletableFuture> result, String key) {
executorService.submit(() -> {
try {
Row row = fetchdata(key);
if (row != null) {
result.complete(Collections.singletonList(row));
} else {
result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
}
} catch (Exception e) {
result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
}
});
}




Best forideal.





在 2020-07-02 15:56:46,"sunfulin"  写道:
>hi,
>我在使用flink 1.10.1 blink 
>planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
>遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : 
>java.lang.Exception: Could not complete the stream element: 
>org.apache.flink.table.dataformat.BinaryRow  caused by : 
>java.util.concurrent.TimeoutException: Async function call has timed out.
>
>
>我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。


?????? ??????FLINKSQL1.10????????????UV

2020-07-03 文章 x
checkpoint??
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key




--  --
??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> > [2]
> >
> >
> 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> >
> > x <35907...@qq.com&gt; ??2020??6??17?? 11:14??
> >
> > &gt; 
??0??UV??UV??
> > &gt; CREATE VIEW uv_per_10min AS
> > &gt; SELECT&amp;nbsp;
> > &gt; &amp;nbsp; 
MAX(DATE_FORMAT(proctime&amp;nbsp;,
> '-MM-dd
> > HH:mm:00'))&amp;nbsp;OVER w
> > &gt; AS time_str,&amp;nbsp;
> > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS 
uv
> > &gt; FROM user_behavior
> > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED
> PRECEDING AND
> > &gt; CURRENT ROW);
> > &gt;
> > &gt;
> > &gt; ??
> > &gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd')
> ??
> > &gt; PS??1.10??DDL??CREATE VIEW??
> > &gt; 

Re: flink 1.10 kafka collector topic 配置pattern

2020-07-03 文章 Jark Wu
可以关注下:https://issues.apache.org/jira/browse/FLINK-18449

预计1.12会支持。

Best,
Jark

On Fri, 3 Jul 2020 at 16:27, Peihui He  wrote:

> 好的,感谢🤗
>
> Leonard Xu  于2020年7月3日周五 下午4:07写道:
>
> > Hello
> >
> > 我了解到社区有人在做了,1.12 应该会支持
> >
> > 祝好
> > Leonard Xu
> >
> > > 在 2020年7月3日,16:02,Peihui He  写道:
> > >
> > > hello
> > >
> > >  请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗?
> > >
> > > best wishes
> >
> >
>


Re: flink asynctablefunction调用异常

2020-07-03 文章 Jark Wu
可以分享下你的 AsyncTableFunction 的实现吗?

Best,
Jark

> 2020年7月2日 15:56,sunfulin  写道:
> 
> hi,
> 我在使用flink 1.10.1 blink 
> planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
> 遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : 
> java.lang.Exception: Could not complete the stream element: 
> org.apache.flink.table.dataformat.BinaryRow  caused by : 
> java.util.concurrent.TimeoutException: Async function call has timed out.
> 
> 
> 我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。



Re: flink 1.10 kafka collector topic 配置pattern

2020-07-03 文章 Peihui He
好的,感谢🤗

Leonard Xu  于2020年7月3日周五 下午4:07写道:

> Hello
>
> 我了解到社区有人在做了,1.12 应该会支持
>
> 祝好
> Leonard Xu
>
> > 在 2020年7月3日,16:02,Peihui He  写道:
> >
> > hello
> >
> >  请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗?
> >
> > best wishes
>
>


Re: flink 1.10 kafka collector topic 配置pattern

2020-07-03 文章 Leonard Xu
Hello

我了解到社区有人在做了,1.12 应该会支持

祝好
Leonard Xu

> 在 2020年7月3日,16:02,Peihui He  写道:
> 
> hello
> 
>  请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗?
> 
> best wishes



flink 1.10 kafka collector topic 配置pattern

2020-07-03 文章 Peihui He
hello

  请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗?

best wishes


flink 1.9 中 StreamTableEnvironment 注册 registerDataStream处理嵌套别名

2020-07-03 文章 Jun Zou
Hi,
我在使用flink 1.9版本的 StreamTableEnvironment 注册 table 时,想指定一个嵌套字段的 cloumns
alianame,
例如:
String fieldExprsStr = "modbus.parsedResponse,timestamp";
tableEnv.registerDataStream(src.getName(), srcStream, fieldExprsStr);
在对 modbus.parsedResponse 进行校验的时候
抛出了如下错误:
org.apache.flink.table.api.ValidationException: Field reference expression
or alias on field expression expected.
at
org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.defaultMethod(FieldInfoUtils.java:543)
at
org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.defaultMethod(FieldInfoUtils.java:470)
at
org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor.visit(ApiExpressionDefaultVisitor.java:92)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at
org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:67)

请问是否有方法来指定这种cloumns 别名呢?


回复:rocksdb的block cache usage应该如何使用

2020-07-03 文章 SmileSmile
Hi

作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。

【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】
我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。

详情可见
http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 15:13,Yun Tang 写道:
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?




;
[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
a511955993
邮箱:a511955...@163.com

签名由 网易邮箱大师; 定制

在2020年07月03日 14:59,Yun Tang 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Re: table execution-options 能否通过 -yd 生效

2020-07-03 文章 Jingsong Li
Hi,

如果你是写代码来使用TableEnvironment的,
你要显示的在代码中塞进TableConfig中:

Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.addAll(GlobalConfiguration.loadConfiguration());

CC: @Yang Wang 
GlobalConfiguration是个internal的类,有没有public
API获取对应的Configuration?

Best,
Jingsong

On Fri, Jul 3, 2020 at 3:07 PM Yang Wang  wrote:

> 我理解在Yarn上运行,通过-yD传入和写在flink-conf.yaml里面都是可以生效的
>
>
> Best,
> Yang
>
> liangji  于2020年7月2日周四 下午6:12写道:
>
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html#execution-options
> > // instantiate table environment
> > TableEnvironment tEnv = ...
> >
> > // access flink configuration
> > Configuration configuration = tEnv.getConfig().getConfiguration();
> > // set low-level key-value options
> > configuration.setString("table.exec.mini-batch.enabled", "true");
> > configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
> > configuration.setString("table.exec.mini-batch.size", "5000");
> >
> > 请问下,table的这些参数是不是只能在代码里面设置,通过 -yd 传入可否生效呢?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


-- 
Best, Jingsong Lee


Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 文章 Yun Tang
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?





[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
a511955993
邮箱:a511955...@163.com

签名由 网易邮箱大师 定制

在2020年07月03日 14:59,Yun Tang 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


回复:rocksdb的block cache usage应该如何使用

2020-07-03 文章 SmileSmile
hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?





| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:59,Yun Tang 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Re: table execution-options 能否通过 -yd 生效

2020-07-03 文章 Yang Wang
我理解在Yarn上运行,通过-yD传入和写在flink-conf.yaml里面都是可以生效的


Best,
Yang

liangji  于2020年7月2日周四 下午6:12写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html#execution-options
> // instantiate table environment
> TableEnvironment tEnv = ...
>
> // access flink configuration
> Configuration configuration = tEnv.getConfig().getConfiguration();
> // set low-level key-value options
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
> configuration.setString("table.exec.mini-batch.size", "5000");
>
> 请问下,table的这些参数是不是只能在代码里面设置,通过 -yd 传入可否生效呢?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 文章 Yun Tang
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制