去重后相加的流程怎样设计效率更好一些?

2020-02-06 文章 Kevin Liao
场景是这样的: 数据要按 id 字段去重,然后再根据其他 field 做 key 然后对其中数值字段相加后输出,大概是 datastream.keyby("id").filter(new DeduplicateFilter()).keyby("field2", "field3").window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(...) 其中DeduplicateFilter继承了 RichFunction 继续 MapState 做去重, 但这个方案实测性能比较差(如果不做第一个 keyby

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-19 文章 Kevin Liao
quot;_rowtime", Types.SQL_TIMESTAMP) //.rowtime( //new Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) .field("doc", Types.POJO(Doc.class)) ) 改成使用 DataTypes 后可以跑通 Kevin Liao 于2020年1月14日周二 上午11

Re: 怎么使用flink的内存作为缓存

2020-01-16 文章 Kevin Liao
valueState https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html sun <1392427...@qq.com> 于2020年1月16日周四 下午6:05写道: > > 以前我们使用flink的时候,用到了redis,将redis作为缓存的中间件,存放一些缓存的数据,好像flink可以把这些缓存的数据放在flink内存里面,请问怎么使用flink内存来替代redis

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 文章 Kevin Liao
我用的是 https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz 官网下载的 您说的 master 最新的版本我稍后试一下,谢谢 JingsongLee 于2020年1月14日周二 上午11:51写道: > 谢谢, > 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。 > > Best, > Jingsong Lee > >

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 文章 Kevin Liao
flink 版本是 1.9.1 release Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 30 多个字段,我理解这跟字段数关系不大 ``` import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; /** * @author

blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 文章 Kevin Liao
tEnv.connect(new Kafka() .version("universal") .topic("xxx") .startFromLatest() .property("bootstrap.servers", "") .property("group.id", "")) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(new

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Kevin Liao
tractTtlDecorator.java#L96 > > 祝好 > 唐云 > > -- > *From:* Kevin Liao > *Sent:* Friday, January 10, 2020 1:08 > *To:* Yun Tang > *Cc:* user-zh@flink.apache.org > *Subject:* Re: flink遇到 valueState 自身的 NPE > > 谢答,首先贴的代码确实是运行的程序 > > 此外刚刚又通过打印 log 确认了 uniqMark ==

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Kevin Liao
null,这也符合作业成功restore之后,又再次立即遇到failover的场景,也就是处理到了非法“脏数据”,导致作业不断failover。建议你按照这个思路确认排除一下。 > > 祝好 > 唐云 > -- > *From:* Kevin Liao > *Sent:* Thursday, January 9, 2020 23:17 > *To:* user-zh@flink.apache.org > *Subject:* Re: flink遇到 valueState 自身的 NPE

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Kevin Liao
p.open()`. > 但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。 > > Kevin Liao 于2020年1月9日周四 下午8:15写道: > > > https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg > > > > 抱歉,再试试这个 > > > > Benchao Li 于2020年1月9日周四 下午8:13写道: > >

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Kevin Liao
https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg 抱歉,再试试这个 Benchao Li 于2020年1月9日周四 下午8:13写道: > 我这边点开是 403 Forbidden > > Kevin Liao 于2020年1月9日周四 下午8:09写道: > > > > > > https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu0

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Kevin Liao
用一些第三方的图床工具。 > 或者你可以直接贴文字? > > Kevin Liao 于2020年1月9日周四 下午7:10写道: > > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png] > > > > Benchao Li 于2020年1月9日周四 下午6:42写道: > > > >> hi Kevin, > >> > >> 能贴一下MyMapFunction2.java:39 这里的代码吗?

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Kevin Liao
[image: B40C260D-DCC3-4B7D-A024-3839803C2234.png] Benchao Li 于2020年1月9日周四 下午6:42写道: > hi Kevin, > > 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。 > > Kevin Liao 于2020年1月9日周四 下午5:57写道: > > > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的 > > >

flink遇到 valueState 自身的 NPE

2020-01-09 文章 Kevin Liao
早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的 ``` 2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Filter -> Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched from RUNNING to FAILED.

请教 flink 如何读取存放于 hdfs 的 lzo 压缩文件?

2019-09-09 文章 Kevin Liao
Hi, ALL 现在想讲过去的一组 mr 程序迁移到 flink 平台,原始日志存储在 hdfs 上,以 lzo 压缩,想读取成一个 datastream 处理,看到 StreamExecutionEnvironment.readFile(FileInputFormat inputFormat, String filePath), 这个方法似乎符合要求,但是 lzo 的解压应该用哪个包呢? google 没有找到什么明确的线索 请教各位,谢谢