Hi Vijay,

Could you check if the Watermark for the aggregate operator advances?
You should be able to check that in the Flink WebUI. Could it be that
the Watermark does not advance for all of the upstream operators? The
watermark for a particular operator is a minimum of watermarks received
from all of the upstream operators. Therefore if some of them does not
produce any, the resulting watermark will not advance.

Best,

Dawdi

On 11/10/2019 21:37, Vijay Balakrishnan wrote:
> Hi,
> Here is my issue with *Event Processing* with the *add() method of
> MGroupingWindowAggregate not being called* even though a new watermark
> is fired
> 1. *Ingest data from Kinesis (works fine)*
> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get
> json back)
> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each
> incoming record but the *windowStream.aggregate method doesn't seem to
> fire* and I *don't see the add() method of MGroupingWindowAggregate
> called *???? I *can see the newWaterMark being emitted in
> TimestampsAndPunctuatedWatermarksOperator.processElement*
> 4. I have tried with timeWindow of 1m and 15s
>
> *Main* code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>
> //Setup Kinesis Consumer
> Properties kinesisConsumerConfig = new Properties();
> ..
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.LATEST.name
> <http://ConsumerConfigConstants.InitialPosition.LATEST.name>());//LATEST
> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new
> FlinkKinesisConsumer<>(
>                 "kinesisTopicRead", new
> MonitoringMapKinesisSchema(true), kinesisConsumerConfig);
>
> DataStream<Map<String, Object>> kinesisStream;
> RichSinkFunction<InfluxDBPoint> influxSink;
>
> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> kinesisStream = monitoringDataStreamSource
>         .assignTimestampsAndWatermarks(new
> *MonitoringTSWAssigner*(bound));
> influxSink = pms.createInfluxMonitoringSink(....);
> ......
> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
> timeIntervalL=15s, 1m 
>
> KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream =
>         kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
> DataStream<InfluxDBPoint> enrichedMGStream =
> *windowStream.aggregate*(//*<===== never reaches here ?????*
>         *new MGroupingWindowAggregate(interval)*,
>         new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
>         .map(new MonitoringGroupingToInfluxDBPoint(rule));
> enrichedMGStream.addSink(influxSink);
> env.execute("Aggregation of Map data");
>
> *MonitoringTSWAssigner* code:
> public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks<Map<String, Object>> {
>     private long bound = 5 * (long) 1000;//5 secs out of order bound
> in millisecs
>     private long maxTimestamp = Long.MIN_VALUE;
>
>     public MonitoringTSWAssigner() {
>     }
>
>     public MonitoringTSWAssigner(long bound) {
>         this.bound = bound;
>     }
>
>     public long extractTimestamp(Map<String, Object> monitoring, long
> previousTS) {
>         long extractedTS = getExtractedTS(monitoring);
>         if (extractedTS > maxTimestamp) {
>             maxTimestamp = extractedTS;
>         }
> return extractedTS;//return System.currentTimeMillis();
>     }
>
>     public long getExtractedTS(Map<String, Object> monitoring) {
>         final String eventTimestamp =
> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
>         return Utils.getLongFromDateStr(eventTimestamp);
>     }
>
>     @Override
>     public Watermark checkAndGetNextWatermark(Map<String, Object>
> monitoring, long extractedTimestamp) {
>         long extractedTS = getExtractedTS(monitoring);
>         long nextWatermark = maxTimestamp - bound;
>         return new Watermark(nextWatermark);
>     }
> }
>
> *MGroupingWindowAggregate*:
> public class MGroupingWindowAggregate implements
> *AggregateFunction*<Map<String, Object>, Map<String, Object>,
> Map<String, Object>> {
>     private final String interval;
>     public MGroupingWindowAggregate(String interval) {
>         this.interval = interval;
>     }
>     public Map<String, Object> createAccumulator() {
>         return new ConcurrentHashMap<>();
>     }
>
>     public Map<String, Object> add(Map<String, Object> monitoring,
> Map<String, Object> timedMap) {
> .....
> }
>
> .....
>
> }
>
> TIA,
>
>        

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to