可以参考jdbc-connector写mysql的思路,在java里面用hashMap来存,key为 order_id ,然后定时把map的数据刷mysql
18703416...@163.com <18703416...@163.com> 于2022年3月1日周二 14:40写道: > 首先确定 source 事件有 eventTime ,比如 source 的返回类型为 MySource > 示例代码如下: > static class MySource { > Long ts; > String key; > Object object; > } > env.addSource(new SourceFunction<MySource>() { > @Override > public void run(SourceContext<MySource> ctx) throws Exception { > ctx.collect(new MySource()); > } > @Override > public void cancel() { > } > }).keyBy(new KeySelector<MySource, String>() { > @Override > public String getKey(MySource value) throws Exception { > return value.key; > } > }).timeWindow(Time.seconds(10)).process(new > ProcessWindowFunction<MySource, Object, String, TimeWindow>() { > @Override > public void process(String s, Context context, Iterable<MySource> > elements, Collector<Object> out) throws Exception { > List<MySource> collect = > Lists.newArrayList(elements).stream().sorted(new Comparator<MySource>() { > @Override > public int compare(MySource o1, MySource o2) { > return o2.ts.compareTo(o1.ts); > } > }).collect(Collectors.toList()); > if (collect.size() > 0){ > out.collect(collect.get(0).object); > } > } > }).addSink(new SinkFunction<Object>() { > @Override > public void invoke(Object value, Context context) throws Exception { > System.out.println(value); > } > }); > > > > > > > 2022年3月1日 上午11:35,Lei Wang <leiwang...@gmail.com> 写道: > > > > 谢谢,这种是可以。 > > > > 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期: > > > > env.addSource(consumer).keyBy(new KeySelector<String, String>() { > > @Override > > public String getKey(String value) throws Exception { > > return value; > > } > > }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink() > > > > 实际上逆序输出了窗口内的所有记录。 > > > > 谢谢, > > > > 王磊 > > > > > > > > On Mon, Feb 28, 2022 at 9:59 AM 18703416...@163.com <18703416...@163.com > > > > wrote: > > > >> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小 > >> > >>> 2022年2月25日 下午6:45,Lei Wang <leiwang...@gmail.com> 写道: > >>> > >>> 场景描述: > >>> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下: > >>> order_id status > >>> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。 > >>> > >>> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id > >>> 最终的状态不丢,但这个最终的状态也不确定是多少。 > >>> > >>> 我的做法是 KeyBy orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id > >>> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。 > >>> > >>> 请问有什么其他的解决方法吗? > >>> > >>> 谢谢, > >>> 王磊 > >> > >> > >