You mean you want to output some data when you know that you don’t have any 
counts for a given time window?

This is not (easily) possible in Flink right now because this would require an 
operation with parallelism one that determines that there is no data across all 
keys.

Best,
Aljoscha

> On 24. Jun 2017, at 18:22, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote:
> 
> Hi,
> 
> I am trying to implement a flink job which takes the twitter as the source 
> and collects tweets from a list of hashtags. The flink job basically 
> aggregates the volume of tweets per hashtag in a given time frame. I have 
> implemented them successfully, but then if there is no tweet across all the 
> hashtags I need to send out a default value of 0 across all hashtags. Not 
> sure how to implement this functionality.
> 
> Code Snippet :
> 
> env.addSource(source)
> 
>             .flatMap(new ExtractHashTagsSymbols(tickers))
> 
>             .keyBy(0)
> 
>             .timeWindow(Time.seconds(Integer.parseInt(window_time)))
> 
>             .sum(1)
> 
>             .timeWindowAll(Time.seconds(Integer.parseInt(window_time)))
> 
>             .apply(new GetVolume(tickerVolumeMap))
> 
> .addSink(new SinkFunction<JSONObject>(){
> 
>       
>     public void invoke(JSONObject value) throws Exception {
> 
>     System.out.println("Twitter Volume:"+value.toString());
> 
>     //JsonParser jsonParser = new JsonParser();
> 
>         //JsonObject gsonObject = 
> (JsonObject)jsonParser.parse(value.toString());
> 
>     pushToSocket(value, socket_url);
> 
>     }
> 
>     });
> 
> 
> 
> The above code waits for window_time time frame and computes the tweet volume 
> and sends out a json. 
> 
> Regards,
> 
> Vijay Raajaa GS 
> 

Reply via email to