Hi

Do you mean `*windowStream.aggregate`* do not work for all records or just
some records. If for some records, can you try to confirm that the assigned
watermark is monotonic increase. If for all records, can you confirm that
the watermark has reached the end of the window?

In another word, could you share how do you tell that `*windowStream.aggregate
method doesn't seem to fire`?*

Best,
Congxian


Vijay Balakrishnan <bvija...@gmail.com> 于2019年10月12日周六 上午3:37写道:

> Hi,
> Here is my issue with *Event Processing* with the *add() method of
> MGroupingWindowAggregate not being called* even though a new watermark is
> fired
> 1. *Ingest data from Kinesis (works fine)*
> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json
> back)
> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each
> incoming record but the *windowStream.aggregate method doesn't seem to
> fire* and I
> *don't see the add() method of MGroupingWindowAggregatecalled *???? I *can
> see the newWaterMark being emitted in
> TimestampsAndPunctuatedWatermarksOperator.processElement*
> 4. I have tried with timeWindow of 1m and 15s
>
> *Main* code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>
> //Setup Kinesis Consumer
> Properties kinesisConsumerConfig = new Properties();
> ..
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new
> FlinkKinesisConsumer<>(
>                 "kinesisTopicRead", new MonitoringMapKinesisSchema(true),
> kinesisConsumerConfig);
>
> DataStream<Map<String, Object>> kinesisStream;
> RichSinkFunction<InfluxDBPoint> influxSink;
>
> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> kinesisStream = monitoringDataStreamSource
>         .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound
> ));
> influxSink = pms.createInfluxMonitoringSink(....);
> ......
> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
> timeIntervalL=15s, 1m
>
> KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream =
>         kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
> DataStream<InfluxDBPoint> enrichedMGStream = 
> *windowStream.aggregate*(//*<=====
> never reaches here ?????*
>         *new MGroupingWindowAggregate(interval)*,
>         new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
>         .map(new MonitoringGroupingToInfluxDBPoint(rule));
> enrichedMGStream.addSink(influxSink);
> env.execute("Aggregation of Map data");
>
> *MonitoringTSWAssigner* code:
> public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks<Map<String, Object>> {
>     private long bound = 5 * (long) 1000;//5 secs out of order bound in
> millisecs
>     private long maxTimestamp = Long.MIN_VALUE;
>
>     public MonitoringTSWAssigner() {
>     }
>
>     public MonitoringTSWAssigner(long bound) {
>         this.bound = bound;
>     }
>
>     public long extractTimestamp(Map<String, Object> monitoring, long
> previousTS) {
>         long extractedTS = getExtractedTS(monitoring);
>         if (extractedTS > maxTimestamp) {
>             maxTimestamp = extractedTS;
>         }
>
>    return extractedTS;//return System.currentTimeMillis();
>
>     }
>
>     public long getExtractedTS(Map<String, Object> monitoring) {
>         final String eventTimestamp =
> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
>         return Utils.getLongFromDateStr(eventTimestamp);
>     }
>
>     @Override
>     public Watermark checkAndGetNextWatermark(Map<String, Object>
> monitoring, long extractedTimestamp) {
>         long extractedTS = getExtractedTS(monitoring);
>         long nextWatermark = maxTimestamp - bound;
>         return new Watermark(nextWatermark);
>     }
> }
>
> *MGroupingWindowAggregate*:
> public class MGroupingWindowAggregate implements 
> *AggregateFunction*<Map<String,
> Object>, Map<String, Object>, Map<String, Object>> {
>     private final String interval;
>     public MGroupingWindowAggregate(String interval) {
>         this.interval = interval;
>     }
>     public Map<String, Object> createAccumulator() {
>         return new ConcurrentHashMap<>();
>     }
>
>     public Map<String, Object> add(Map<String, Object> monitoring,
> Map<String, Object> timedMap) {
> .....
> }
>
> .....
>
> }
>
> TIA,
>
>
>

Reply via email to