场景是这样的:
数据要按 id 字段去重,然后再根据其他 field 做 key 然后对其中数值字段相加后输出,大概是
datastream.keyby("id").filter(new DeduplicateFilter()).keyby("field2",
"field3").window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(...)
其中DeduplicateFilter继承了 RichFunction 继续 MapState 做去重,
但这个方案实测性能比较差(如果不做第一个 keyby
quot;_rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
.field("doc", Types.POJO(Doc.class))
)
改成使用 DataTypes 后可以跑通
Kevin Liao 于2020年1月14日周二 上午11
valueState
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
sun <1392427...@qq.com> 于2020年1月16日周四 下午6:05写道:
>
> 以前我们使用flink的时候,用到了redis,将redis作为缓存的中间件,存放一些缓存的数据,好像flink可以把这些缓存的数据放在flink内存里面,请问怎么使用flink内存来替代redis
我用的是
https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
官网下载的
您说的 master 最新的版本我稍后试一下,谢谢
JingsongLee 于2020年1月14日周二 上午11:51写道:
> 谢谢,
> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>
> Best,
> Jingsong Lee
>
>
flink 版本是 1.9.1 release
Doc
完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
30 多个字段,我理解这跟字段数关系不大
```
import org.apache.commons.lang3.builder.ToStringBuilder;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author
tEnv.connect(new Kafka()
.version("universal")
.topic("xxx")
.startFromLatest()
.property("bootstrap.servers",
"")
.property("group.id", ""))
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(new
tractTtlDecorator.java#L96
>
> 祝好
> 唐云
>
> --
> *From:* Kevin Liao
> *Sent:* Friday, January 10, 2020 1:08
> *To:* Yun Tang
> *Cc:* user-zh@flink.apache.org
> *Subject:* Re: flink遇到 valueState 自身的 NPE
>
> 谢答,首先贴的代码确实是运行的程序
>
> 此外刚刚又通过打印 log 确认了 uniqMark ==
null,这也符合作业成功restore之后,又再次立即遇到failover的场景,也就是处理到了非法“脏数据”,导致作业不断failover。建议你按照这个思路确认排除一下。
>
> 祝好
> 唐云
> --
> *From:* Kevin Liao
> *Sent:* Thursday, January 9, 2020 23:17
> *To:* user-zh@flink.apache.org
> *Subject:* Re: flink遇到 valueState 自身的 NPE
p.open()`.
> 但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。
>
> Kevin Liao 于2020年1月9日周四 下午8:15写道:
>
> > https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg
> >
> > 抱歉,再试试这个
> >
> > Benchao Li 于2020年1月9日周四 下午8:13写道:
> >
https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg
抱歉,再试试这个
Benchao Li 于2020年1月9日周四 下午8:13写道:
> 我这边点开是 403 Forbidden
>
> Kevin Liao 于2020年1月9日周四 下午8:09写道:
>
> >
> >
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu0
用一些第三方的图床工具。
> 或者你可以直接贴文字?
>
> Kevin Liao 于2020年1月9日周四 下午7:10写道:
>
> > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> >
> > Benchao Li 于2020年1月9日周四 下午6:42写道:
> >
> >> hi Kevin,
> >>
> >> 能贴一下MyMapFunction2.java:39 这里的代码吗?
[image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
Benchao Li 于2020年1月9日周四 下午6:42写道:
> hi Kevin,
>
> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
>
> Kevin Liao 于2020年1月9日周四 下午5:57写道:
>
> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> >
>
早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
```
2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Filter ->
Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched from
RUNNING to FAILED.
Hi, ALL
现在想讲过去的一组 mr 程序迁移到 flink 平台,原始日志存储在 hdfs 上,以 lzo 压缩,想读取成一个 datastream 处理,看到
StreamExecutionEnvironment.readFile(FileInputFormat inputFormat, String
filePath), 这个方法似乎符合要求,但是 lzo 的解压应该用哪个包呢?
google 没有找到什么明确的线索
请教各位,谢谢
14 matches
Mail list logo