你好,我理解你的意思了。 可以看下 flink-cep 相关内容, 利用模式匹配去实现
> 2022年1月8日 下午7:10,18765295...@163.com 写道: > > 您好: > 请教一个问题, > 例如:开启一个5秒钟的滚动窗口,当key001的两条数据进来时,没有满足时间触发,但是当key002的数据进来满足窗口触发条件,会将key001的两条数据输出出去。 > > 我想实现的是一个基于事件时间设置的滚动窗口,当key001的数据到来时,没有满足时间时,不会因为key002的数据到来触发key001的数据进行输出。 > 每个key都有一个属于自己的时间窗口,不会受其他分组key的影响,并且可以为每个key的时间窗口设置一个基于数量和时间的触发器,当满足数量时触发或者时间到了触发。 > > 经过测试发现,现在设置的时间窗口里面会有不同key的数据在一起 > 每个分组是否有属于自己的时间窗口。 > > > 数量窗口的逻辑是每个key都有一个属于自己key的数量窗口, > 例如:设置一个数量为3的滚动窗口,输入1,2,3,4,不会触发窗口执行,但是继续输入两条1的数据,会输出三个1的数据。 > > 请问时间窗口可以实现类似数量窗口这样的逻辑吗。 > > public class Test { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > DataStreamSource<String> dataSource = > env.socketTextStream("localhost", 7788); > SingleOutputStreamOperator<OrderItem> map = dataSource.map(new > MapFunction<String, OrderItem>() { > @Override > public OrderItem map(String s) throws Exception { > String[] split = s.split(","); > return new OrderItem(split[0].trim(), > Double.parseDouble(split[1].trim()), Long.parseLong(split[2].trim())); > } > }); > > // 时间窗口测试代码 > SingleOutputStreamOperator<OrderItem> warter = > map.assignTimestampsAndWatermarks( > > WatermarkStrategy.<OrderItem>forBoundedOutOfOrderness(Duration.ofSeconds(0)) > .withTimestampAssigner((event, timestamp) -> > event.getTimeStamp())); > SingleOutputStreamOperator<String> timeWindow = warter.keyBy(data -> > data.getOrderId()) > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .process(new ProcessWindowFunction<OrderItem, String, String, > TimeWindow>() { > SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd > HH:mm:ss"); > > @Override > public void process(String key, > ProcessWindowFunction<OrderItem, > String, String, TimeWindow>.Context context, > Iterable<OrderItem> iterable, > Collector<String> collector) throws > Exception { > Iterator<OrderItem> iterator = iterable.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append("key -> " + key + "窗口开始时间:" + sdf.format(new > Date(context.window().getStart())) + "\t\n"); > while (iterator.hasNext()) { > OrderItem next = iterator.next(); > sb.append(next + "\t\n"); > } > sb.append("窗口结束时间:" + sdf.format(new > Date(context.window().getEnd()))); > collector.collect(sb.toString()); > } > }); > > timeWindow.print(); > env.execute(); > > } > }