Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-28 文章 18703416...@163.com
首先确定 source 事件有 eventTime ,比如 source 的返回类型为 MySource
示例代码如下:
static class MySource {
Long ts;
String key;
Object object;
}
env.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {
ctx.collect(new MySource());
}
@Override
public void cancel() {
}
}).keyBy(new KeySelector() {
@Override
public String getKey(MySource value) throws Exception {
return value.key;
}
}).timeWindow(Time.seconds(10)).process(new ProcessWindowFunction() {
@Override
public void process(String s, Context context, Iterable elements, 
Collector out) throws Exception {
List collect = 
Lists.newArrayList(elements).stream().sorted(new Comparator() {
@Override
public int compare(MySource o1, MySource o2) {
return o2.ts.compareTo(o1.ts);
}
}).collect(Collectors.toList());
if (collect.size() > 0){
out.collect(collect.get(0).object);
}
}
}).addSink(new SinkFunction() {
@Override
public void invoke(Object value, Context context) throws Exception {
System.out.println(value);
}
});





> 2022年3月1日 上午11:35,Lei Wang  写道:
> 
> 谢谢,这种是可以。
> 
> 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期:
> 
> env.addSource(consumer).keyBy(new KeySelector() {
>@Override
>public String getKey(String value) throws Exception {
>return value;
>}
> }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink()
> 
> 实际上逆序输出了窗口内的所有记录。
> 
> 谢谢,
> 
> 王磊
> 
> 
> 
> On Mon, Feb 28, 2022 at 9:59 AM 18703416...@163.com <18703416...@163.com>
> wrote:
> 
>> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>> 
>>> 2022年2月25日 下午6:45,Lei Wang  写道:
>>> 
>>> 场景描述:
>>> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
>>> order_id   status
>>> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
>>> 
>>> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
>>> 最终的状态不丢,但这个最终的状态也不确定是多少。
>>> 
>>> 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
>>> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
>>> 
>>> 请问有什么其他的解决方法吗?
>>> 
>>> 谢谢,
>>> 王磊
>> 
>> 



Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-27 文章 18703416...@163.com
keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小

> 2022年2月25日 下午6:45,Lei Wang  写道:
> 
> 场景描述:
> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> order_id   status
> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> 
> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> 最终的状态不丢,但这个最终的状态也不确定是多少。
> 
> 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> 
> 请问有什么其他的解决方法吗?
> 
> 谢谢,
> 王磊



Re: 如何按比例丢弃kafka中消费的数据

2022-02-27 文章 18703416...@163.com
自定义 kafkasource 的 DeserializationSchema
丢弃的返回 null, source 的下一个filter 算子进行过滤即可

> 2022年2月26日 上午9:01,jack zhang  写道:
> 
> 1、flink程序资源有限,kafka中数据比较多,想要按一定比例丢弃数据(或者其它策略),减轻flink 程序压力,有什么方法吗?



Re: flink状态共享

2022-02-25 文章 18703416...@163.com
如果不同算子 需要共享状态,是否考虑 归为一个算子进行处理,同理后面的算子也是

> 2022年2月25日 下午4:30,huangzhi...@iwgame.com 写道:
> 
> 对于keyed datastream 不同的算子之间是否能够共享同一状态,或者后面的算子任务,是否可以拿到前一个算子任务中的状态?
> 
> 
> 
> huangzhi...@iwgame.com



Re: 如何给flink的输出削峰填谷?

2022-01-27 文章 18703416...@163.com
类似kafka这样的消息管道应该用来 削峰填谷,
可以先sink 至kafka,再从kafka -> db

> 2022年1月26日 上午2:11,Jing  写道:
> 
> Hi Flink中文社区,
> 
> 我碰到一个这样的问题,我的数据库有write throttle, 我的flink
> app是一个10分钟窗口的聚合操作,这样导致,每10分钟有个非常大量的写请求。导致数据库的sink有时候会destroy.
> 有什么办法把这些写请求均匀分布到10分钟吗?
> 
> 
> 谢谢,
> Jing



Re: 关于时间窗口的问题

2022-01-09 文章 18703416...@163.com
你好,我理解你的意思了。
可以看下 flink-cep 相关内容, 利用模式匹配去实现

> 2022年1月8日 下午7:10,18765295...@163.com 写道:
> 
> 您好:
> 请教一个问题,
> 例如:开启一个5秒钟的滚动窗口,当key001的两条数据进来时,没有满足时间触发,但是当key002的数据进来满足窗口触发条件,会将key001的两条数据输出出去。
> 
> 我想实现的是一个基于事件时间设置的滚动窗口,当key001的数据到来时,没有满足时间时,不会因为key002的数据到来触发key001的数据进行输出。
> 每个key都有一个属于自己的时间窗口,不会受其他分组key的影响,并且可以为每个key的时间窗口设置一个基于数量和时间的触发器,当满足数量时触发或者时间到了触发。
> 
> 经过测试发现,现在设置的时间窗口里面会有不同key的数据在一起
> 每个分组是否有属于自己的时间窗口。
> 
> 
> 数量窗口的逻辑是每个key都有一个属于自己key的数量窗口,
> 例如:设置一个数量为3的滚动窗口,输入1,2,3,4,不会触发窗口执行,但是继续输入两条1的数据,会输出三个1的数据。
> 
> 请问时间窗口可以实现类似数量窗口这样的逻辑吗。
> 
> public class Test {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>DataStreamSource dataSource = 
> env.socketTextStream("localhost", 7788);
>SingleOutputStreamOperator map = dataSource.map(new 
> MapFunction() {
>@Override
>public OrderItem map(String s) throws Exception {
>String[] split = s.split(",");
>return new OrderItem(split[0].trim(), 
> Double.parseDouble(split[1].trim()), Long.parseLong(split[2].trim()));
>}
>});
> 
>// 时间窗口测试代码
>SingleOutputStreamOperator warter = 
> map.assignTimestampsAndWatermarks(
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
>.withTimestampAssigner((event, timestamp) -> 
> event.getTimeStamp()));
>SingleOutputStreamOperator timeWindow = warter.keyBy(data -> 
> data.getOrderId())
>.window(TumblingEventTimeWindows.of(Time.seconds(5)))
>.process(new ProcessWindowFunction TimeWindow>() {
>SimpleDateFormat sdf = new SimpleDateFormat("-MM-dd 
> HH:mm:ss");
> 
>@Override
>public void process(String key,
>ProcessWindowFunction String, String, TimeWindow>.Context context,
>Iterable iterable,
>Collector collector) throws 
> Exception {
>Iterator iterator = iterable.iterator();
>StringBuilder sb = new StringBuilder();
>sb.append("key -> " + key + "窗口开始时间:" + sdf.format(new 
> Date(context.window().getStart())) + "\t\n");
>while (iterator.hasNext()) {
>OrderItem next = iterator.next();
>sb.append(next + "\t\n");
>}
>sb.append("窗口结束时间:" + sdf.format(new 
> Date(context.window().getEnd(;
>collector.collect(sb.toString());
>}
>});
> 
>timeWindow.print();
>env.execute();
> 
>}
> }