Hi Vijay, Could you check if the Watermark for the aggregate operator advances? You should be able to check that in the Flink WebUI. Could it be that the Watermark does not advance for all of the upstream operators? The watermark for a particular operator is a minimum of watermarks received from all of the upstream operators. Therefore if some of them does not produce any, the resulting watermark will not advance.
Best, Dawdi On 11/10/2019 21:37, Vijay Balakrishnan wrote: > Hi, > Here is my issue with *Event Processing* with the *add() method of > MGroupingWindowAggregate not being called* even though a new watermark > is fired > 1. *Ingest data from Kinesis (works fine)* > 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get > json back) > 3. I do *assign MonitoringTSWAssigner*(code below) to the source with > bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each > incoming record but the *windowStream.aggregate method doesn't seem to > fire* and I *don't see the add() method of MGroupingWindowAggregate > called *???? I *can see the newWaterMark being emitted in > TimestampsAndPunctuatedWatermarksOperator.processElement* > 4. I have tried with timeWindow of 1m and 15s > > *Main* code: > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*); > > //Setup Kinesis Consumer > Properties kinesisConsumerConfig = new Properties(); > .. > kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > ConsumerConfigConstants.InitialPosition.LATEST.name > <http://ConsumerConfigConstants.InitialPosition.LATEST.name>());//LATEST > FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new > FlinkKinesisConsumer<>( > "kinesisTopicRead", new > MonitoringMapKinesisSchema(true), kinesisConsumerConfig); > > DataStream<Map<String, Object>> kinesisStream; > RichSinkFunction<InfluxDBPoint> influxSink; > > DataStreamSource<Map<String, Object>> monitoringDataStreamSource = > env.addSource(kinesisConsumer); > kinesisStream = monitoringDataStreamSource > .assignTimestampsAndWatermarks(new > *MonitoringTSWAssigner*(bound)); > influxSink = pms.createInfluxMonitoringSink(....); > ...... > ...timeWindow = Time.seconds(*timeIntervalL*);//tried with > timeIntervalL=15s, 1m > > KeyedStream<Map<String, Object>, MonitoringTuple> > monitoringTupleKeyedStream = > kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); > final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> > windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow); > DataStream<InfluxDBPoint> enrichedMGStream = > *windowStream.aggregate*(//*<===== never reaches here ?????* > *new MGroupingWindowAggregate(interval)*, > new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) > .map(new MonitoringGroupingToInfluxDBPoint(rule)); > enrichedMGStream.addSink(influxSink); > env.execute("Aggregation of Map data"); > > *MonitoringTSWAssigner* code: > public class MonitoringTSWAssigner implements > AssignerWithPunctuatedWatermarks<Map<String, Object>> { > private long bound = 5 * (long) 1000;//5 secs out of order bound > in millisecs > private long maxTimestamp = Long.MIN_VALUE; > > public MonitoringTSWAssigner() { > } > > public MonitoringTSWAssigner(long bound) { > this.bound = bound; > } > > public long extractTimestamp(Map<String, Object> monitoring, long > previousTS) { > long extractedTS = getExtractedTS(monitoring); > if (extractedTS > maxTimestamp) { > maxTimestamp = extractedTS; > } > return extractedTS;//return System.currentTimeMillis(); > } > > public long getExtractedTS(Map<String, Object> monitoring) { > final String eventTimestamp = > monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) > monitoring.get(Utils.EVENT_TIMESTAMP) : ""; > return Utils.getLongFromDateStr(eventTimestamp); > } > > @Override > public Watermark checkAndGetNextWatermark(Map<String, Object> > monitoring, long extractedTimestamp) { > long extractedTS = getExtractedTS(monitoring); > long nextWatermark = maxTimestamp - bound; > return new Watermark(nextWatermark); > } > } > > *MGroupingWindowAggregate*: > public class MGroupingWindowAggregate implements > *AggregateFunction*<Map<String, Object>, Map<String, Object>, > Map<String, Object>> { > private final String interval; > public MGroupingWindowAggregate(String interval) { > this.interval = interval; > } > public Map<String, Object> createAccumulator() { > return new ConcurrentHashMap<>(); > } > > public Map<String, Object> add(Map<String, Object> monitoring, > Map<String, Object> timedMap) { > ..... > } > > ..... > > } > > TIA, > >
signature.asc
Description: OpenPGP digital signature