你好,我理解你的意思了。
可以看下 flink-cep 相关内容, 利用模式匹配去实现

> 2022年1月8日 下午7:10,18765295...@163.com 写道:
> 
> 您好:
> 请教一个问题,
> 例如:开启一个5秒钟的滚动窗口,当key001的两条数据进来时,没有满足时间触发,但是当key002的数据进来满足窗口触发条件,会将key001的两条数据输出出去。
> 
> 我想实现的是一个基于事件时间设置的滚动窗口,当key001的数据到来时,没有满足时间时,不会因为key002的数据到来触发key001的数据进行输出。
> 每个key都有一个属于自己的时间窗口,不会受其他分组key的影响,并且可以为每个key的时间窗口设置一个基于数量和时间的触发器,当满足数量时触发或者时间到了触发。
> 
> 经过测试发现,现在设置的时间窗口里面会有不同key的数据在一起
> 每个分组是否有属于自己的时间窗口。
> 
> 
> 数量窗口的逻辑是每个key都有一个属于自己key的数量窗口,
> 例如:设置一个数量为3的滚动窗口,输入1,2,3,4,不会触发窗口执行,但是继续输入两条1的数据,会输出三个1的数据。
> 
> 请问时间窗口可以实现类似数量窗口这样的逻辑吗。
> 
> public class Test {
>    public static void main(String[] args) throws Exception {
>        StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>        env.setParallelism(1);
>        DataStreamSource<String> dataSource = 
> env.socketTextStream("localhost", 7788);
>        SingleOutputStreamOperator<OrderItem> map = dataSource.map(new 
> MapFunction<String, OrderItem>() {
>            @Override
>            public OrderItem map(String s) throws Exception {
>                String[] split = s.split(",");
>                return new OrderItem(split[0].trim(), 
> Double.parseDouble(split[1].trim()), Long.parseLong(split[2].trim()));
>            }
>        });
> 
>        // 时间窗口测试代码
>        SingleOutputStreamOperator<OrderItem> warter = 
> map.assignTimestampsAndWatermarks(
>                
> WatermarkStrategy.<OrderItem>forBoundedOutOfOrderness(Duration.ofSeconds(0))
>                        .withTimestampAssigner((event, timestamp) -> 
> event.getTimeStamp()));
>        SingleOutputStreamOperator<String> timeWindow = warter.keyBy(data -> 
> data.getOrderId())
>                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>                .process(new ProcessWindowFunction<OrderItem, String, String, 
> TimeWindow>() {
>                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
> HH:mm:ss");
> 
>                    @Override
>                    public void process(String key,
>                                        ProcessWindowFunction<OrderItem, 
> String, String, TimeWindow>.Context context,
>                                        Iterable<OrderItem> iterable,
>                                        Collector<String> collector) throws 
> Exception {
>                        Iterator<OrderItem> iterator = iterable.iterator();
>                        StringBuilder sb = new StringBuilder();
>                        sb.append("key -> " + key + "窗口开始时间:" + sdf.format(new 
> Date(context.window().getStart())) + "\t\n");
>                        while (iterator.hasNext()) {
>                            OrderItem next = iterator.next();
>                            sb.append(next + "\t\n");
>                        }
>                        sb.append("窗口结束时间:" + sdf.format(new 
> Date(context.window().getEnd())));
>                        collector.collect(sb.toString());
>                    }
>                });
> 
>        timeWindow.print();
>        env.execute();
> 
>    }
> }

回复