Hi Vijay, 

Maybe a stupid question, but according to your comments, the code works fine up 
till a "flatMap" operation. It seems that this flatMap is directly followed by 
a filter-Function in the method 
createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out all 
events? Or is not even the filter function itself called? (Due to your comments 
suggesting it). 

Best regards 
Theo 


Von: "Vijay Balakrishnan" <bvija...@gmail.com> 
An: "Dawid Wysakowicz" <dwysakow...@apache.org> 
CC: "user" <user@flink.apache.org> 
Gesendet: Dienstag, 15. Oktober 2019 02:01:05 
Betreff: Re: add() method of AggregateFunction not called even though new 
watermark is emitted 

Hi, 
Thx for the replies - Congxian & Dawdi. 
Watermarks are advancing. Not sure how to check every new generated watermark 
is reaching end of the window ???? 

I did check the Flink UI for the currentInputWatermark and it is increasing 
monotonically. 

Narrowed down the problem to not calling the windowStream.aggregate. 
I also added a checkpoint to see if it was causing the issue.Didn't seem to 
help. 
Most of the code is reached during the creation of the ExecutionGraph on the 
start of the program. 

I generate an incrementing sequence of timestamps(delay of 5000ms between each 
rec) from a Producer to Kinesis and it emits a new watermark as it starts 
receiving the input records. 
My window size is 15s. 
I see a WindowedStream is created with windowAssigner: 
TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger 
but the code never gets into the EventTimeTrigger.onElement() or onEventTime() 
to fire the trigger . 
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). 
I even tried to use ProcessingTime but that also didn't help. 


//code to create kinesis consumer successfully...... 
for (Rule rule : rules.getRules()) { 
//gets in here fine 
final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = 
kinesisStream.filter(mon -> { 
boolean result; 
String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) 
: ""; 
InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); 
String measurement = inputMetricSelector != null ? 
inputMetricSelector.getMeasurement() : ""; 
result = eventName.equals(measurement); 
if (result) { 
Map<String, String> inputTags = mon.get(TAGS) != null ? (Map<String, String>) 
mon.get(TAGS) : new HashMap<>(); 
Map<String, String> ruleTags = inputMetricSelector != null ? 
inputMetricSelector.getTags() : new HashMap<>(); 
result = matchTags(inputTags, ruleTags); 
} 
return result;// <== this is true 
} 
).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) (input, 
out) -> { 
out.collect(input);// <==== runs up till here fine 
}).returns(new TypeHint<Map<String, Object>>() { 
}); 
// doesn't do anything beyond this point at runtime 
DataStream<InfluxDBPoint> enrichedMGStream = 
pms.createAggregatedMonitoringGroupingWindowStream1 
(filteredKinesisStream, ruleFactory, rule, parallelProcess); 
enrichedMGStream.addSink(influxSink) 
.setParallelism(nbrSinks); 
} 

private DataStream<InfluxDBPoint> 
createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, Object>> 
kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) { 
DataStream<InfluxDBPoint> enrichedComponentInstanceStream1; 
RuleConfig ruleConfig = rule.getRuleConfig(); 
String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; 
RuleIF ruleImpl = ruleFactory.getRule(ruleType); 
Map<String, Object> ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() 
: new HashMap<>(); 
Object intervalObj = ruleProps.get("rule_eval_window"); 
String timeInterval = intervalObj != null ? (String) intervalObj : ""; 
org.apache.flink.streaming.api.windowing.time.Time timeWindow = 
getTimeWindowFromInterval(timeInterval); 

Object windowTypeObj = ruleProps.get("window_type"); 
String windowType = windowTypeObj != null ? (String) windowTypeObj : ""; 

InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); 
Map<String, String> tags = inputMetricSelector != null ? 
inputMetricSelector.getTags() : new HashMap<>(); 
String groupByObj = tags.get(GROUP_BY); 
String groupBy = groupByObj != null ? groupByObj : ""; 
kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) 
inputMap -> { 
Object groupByValueObj = inputMap.get(groupBy); 
return groupByValueObj != null; 
}); 
Set<String> groupBySet = new 
HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER))); 
String metric = Objects.requireNonNull(inputMetricSelector).getMetric(); 
//till here, it went through fine during creation of ExceutionGraph 
KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream = 
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); <=== never 
gets into the MapTupleKeySelector.getKey() - a similar class works in another 
project 
enrichedComponentInstanceStream1 = 
getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, 
windowType, timeInterval, ruleImpl, rule, parallelProcess); 
return enrichedComponentInstanceStream1; 
} 

private DataStream<InfluxDBPoint> 
getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, MonitoringTuple> 
monitoringTupleKeyedStream, 
org.apache.flink.streaming.api.windowing.time.Time timeWindow, String 
windowType, 
String interval, 
RuleIF ruleImpl, Rule rule, int parallelProcess) { 
long slide = 100; 
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> 
windowStream = 
windowType.equalsIgnoreCase(SLIDING) ? 
monitoringTupleKeyedStream 
.timeWindow(timeWindow, 
org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) : 
monitoringTupleKeyedStream 
.timeWindow(timeWindow); 
return windowStream.aggregate( 
new MGroupingWindowAggregate(interval),// <=== never gets into add() here 
new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) 
.map(new MonitoringGroupingToInfluxDBPoint(rule)); 

} 

On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz < [ 
mailto:dwysakow...@apache.org | dwysakow...@apache.org ] > wrote: 





Hi Vijay, 

Could you check if the Watermark for the aggregate operator advances? You 
should be able to check that in the Flink WebUI. Could it be that the Watermark 
does not advance for all of the upstream operators? The watermark for a 
particular operator is a minimum of watermarks received from all of the 
upstream operators. Therefore if some of them does not produce any, the 
resulting watermark will not advance. 

Best, 

Dawdi 
On 11/10/2019 21:37, Vijay Balakrishnan wrote: 

BQ_BEGIN

Hi, 
Here is my issue with Event Processing with the add() method of 
MGroupingWindowAggregate not being called even though a new watermark is fired 
1. Ingest data from Kinesis (works fine) 
2. Deserialize in MonitoringMapKinesisSchema( works fine and get json back) 
3. I do assign MonitoringTSWAssigner (code below) to the source with bound of 
10(have tried 3000, 30000). It fires a new WaterMark with each 
incoming record but the windowStream.aggregate method doesn't seem to fire and 
I don't see the add() method of MGroupingWindowAggregate 
called ???? I can see the newWaterMark being emitted in 
TimestampsAndPunctuatedWatermarksOperator.processElement 
4. I have tried with timeWindow of 1m and 15s 

Main code: 

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
env.setStreamTimeCharacteristic(TimeCharacteristic. EventTime ); 

//Setup Kinesis Consumer 
Properties kinesisConsumerConfig = new Properties(); 
.. 
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, [ 
http://consumerconfigconstants.initialposition.latest.name/ | 
ConsumerConfigConstants.InitialPosition.LATEST.name ] ());//LATEST 
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new 
FlinkKinesisConsumer<>( 
"kinesisTopicRead", new MonitoringMapKinesisSchema(true), 
kinesisConsumerConfig); 

DataStream<Map<String, Object>> kinesisStream; 
RichSinkFunction<InfluxDBPoint> influxSink; 

DataStreamSource<Map<String, Object>> monitoringDataStreamSource = 
env.addSource(kinesisConsumer); 
kinesisStream = monitoringDataStreamSource 
.assignTimestampsAndWatermarks(new MonitoringTSWAssigner ( bound )); 
influxSink = pms.createInfluxMonitoringSink(....); 
...... 
...timeWindow = Time.seconds( timeIntervalL );// tried with timeIntervalL=15s, 
1m 

KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream = 
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); 
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> 
windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow); 
DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate (// <===== 
never reaches here ????? 
new MGroupingWindowAggregate(interval) , 
new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) 
.map(new MonitoringGroupingToInfluxDBPoint(rule)); 
enrichedMGStream.addSink(influxSink); 
env.execute("Aggregation of Map data"); 

MonitoringTSWAssigner code: 
public class MonitoringTSWAssigner implements 
AssignerWithPunctuatedWatermarks<Map<String, Object>> { 
private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs 
private long maxTimestamp = Long.MIN_VALUE; 

public MonitoringTSWAssigner() { 
} 

public MonitoringTSWAssigner(long bound) { 
this.bound = bound; 
} 

public long extractTimestamp(Map<String, Object> monitoring, long previousTS) { 
long extractedTS = getExtractedTS(monitoring); 
if (extractedTS > maxTimestamp) { 
maxTimestamp = extractedTS; 
} 
return extractedTS; //return System.currentTimeMillis(); 
} 

public long getExtractedTS(Map<String, Object> monitoring) { 
final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? 
(String) monitoring.get(Utils.EVENT_TIMESTAMP) : ""; 
return Utils.getLongFromDateStr(eventTimestamp); 
} 

@Override 
public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long 
extractedTimestamp) { 
long extractedTS = getExtractedTS(monitoring); 
long nextWatermark = maxTimestamp - bound; 
return new Watermark(nextWatermark); 
} 
} 

MGroupingWindowAggregate : 
public class MGroupingWindowAggregate implements AggregateFunction <Map<String, 
Object>, Map<String, Object>, Map<String, Object>> { 
private final String interval; 
public MGroupingWindowAggregate(String interval) { 
this.interval = interval; 
} 
public Map<String, Object> createAccumulator() { 
return new ConcurrentHashMap<>(); 
} 

public Map<String, Object> add(Map<String, Object> monitoring, Map<String, 
Object> timedMap) { 
..... 
} 

..... 

} 

TIA, 






BQ_END

Reply via email to