各位大神,你们好: 最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束 窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束 时发送这个数据,应该如何处理?万分感激
// 这里配置了kafka的信息,并进行数据流的输入 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer010<RfidRawData> kafkaSource = new FlinkKafkaConsumer010<>("rfid-input-topic", new RfidRawDataSchema(), props); kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp()); DataStream<RfidRawData> dataStream = env.addSource(kafkaSource); // 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写 的是窗口融合的方法 DataStream<RfidRawData> outputStream = dataStream.keyBy("uniqueId") .window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new RfidReduceFunction()); //通过kafka数据流的输出 outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new RfidRawDataSchema(), props)); try { env.execute("Flink add data source"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }