各位大神,你们好:

       最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束
窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束
时发送这个数据,应该如何处理?万分感激

 

// 这里配置了kafka的信息,并进行数据流的输入

 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

      FlinkKafkaConsumer010<RfidRawData> kafkaSource = new
FlinkKafkaConsumer010<>("rfid-input-topic",

            new RfidRawDataSchema(), props);

      kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());

 

      DataStream<RfidRawData> dataStream = env.addSource(kafkaSource);

 

      // 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写
的是窗口融合的方法

      DataStream<RfidRawData> outputStream = dataStream.keyBy("uniqueId")

      .window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new

      RfidReduceFunction());

   

      //通过kafka数据流的输出

outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new
RfidRawDataSchema(), props));

 

      try {

         env.execute("Flink add data source");

      } catch (Exception e) {

         // TODO Auto-generated catch block

         e.printStackTrace();

      }

回复