Hi,
An AllWindow operator requires an AllWindowFunction, instead of a 
WindowFunction. In your case, the keyBy() seems to be in the wrong place, to 
get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because 
you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha
> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote:
> 
> Hi,
> 
> I am trying to combine two kafka topics using the a single kafka consumer on 
> a list of topics, further convert the json string in the stream to POJO. 
> Then, join them via keyBy ( On event time field ) and to merge them as a 
> single fat json, I was planning to use a window stream and apply a window 
> function on the window stream. The assumption is that Topic-A & Topic-B can 
> be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON 
> ) will be present with the same eventTime. Hence was planning to use a 
> coutWindow(2) post keyBy on eventTime.
> 
> I have couple of questions for the same;
> 
> 1. Is the approach fine for merging topics and creating a single JSON?
> 2. The window function on All Window stream doesnt seem to work fine; Any 
> pointers will be greatly appreciated.
> 
> Code Snippet : 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> logger.info <http://logger.info/>("Flink Stream Window Charger has started");
> 
> Properties properties = new Properties();
> 
> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 
> <http://127.0.0.1:1030/>");
> 
> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka 
> <http://127.0.0.1:2181/service-kafka>");
> 
> properties.setProperty("group.id <http://group.id/>", "group-0011");
> 
> properties.setProperty("auto.offset.reset", "smallest");
> 
> 
> 
> List < String > names = new ArrayList < > ();
> 
> 
> 
> names.add("Topic-A");
> 
> names.add("Topic-B");
> 
> 
> 
> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > 
> (names, new SimpleStringSchema(), properties));
> 
> DataStream < TopicPojo > pojo = stream.map(new 
> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
> 
> List < String > where = new ArrayList < String > ();
> 
> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new 
> Tokenizer()).countWindowAll(2);
> 
> DataStream < String > data_charging = data_window.apply(new 
> MyWindowFunction());
> 
> data_charging.addSink(new SinkFunction < String > () {
> 
> 
> 
> public void invoke(String value) throws Exception {
> 
> 
> 
>   // Yet to be implemented - Merge two POJO into one 
> 
>  }
> 
> });
> 
> 
> 
> try
> 
> {
> 
>  env.execute();
> 
> } catch (Exception e)
> 
> {
> 
>  return;
> 
> }
> 
> }
> 
> }
> 
> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
> 
>  private static final long serialVersionUID = 1 L;
> 
>  @Override
> 
>  public void flatMap(TopicPojo value, Collector < String > out) throws 
> Exception {
> 
>   ObjectMapper mapper = new ObjectMapper();
> 
>   out.collect(mapper.writeValueAsString(value));
> 
>  }
> 
> }
> 
> class MyWindowFunction implements WindowFunction < TopicPojo, String, String, 
> GlobalWindow > {
> 
>  @Override
> 
>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo > 
> arg2, Collector < String > out)
> 
>  throws Exception {
> 
>   int count = 0;
> 
>   for (TopicPojo in : arg2) {
> 
>    count++;
> 
>   }
> 
>   // Test Result - TO be modified
> 
>   out.collect("Window: " + window + "count: " + count);
> 
> 
> 
>  }
> 
> }
> 
> class Deserializer implements MapFunction < String, TopicPojo > {
> 
>  private static final long serialVersionUID = 1 L;
> 
>  @Override
> 
>  public TopicPojo map(String value) throws IOException {
> 
>   // TODO Auto-generated method stub
> 
>   ObjectMapper mapper = new ObjectMapper();
> 
>   TopicPojo obj = null;
> 
>   try {
> 
> 
> 
>    System.out.println(value);
> 
> 
> 
>    obj = mapper.readValue(value, TopicPojo.class);
> 
> 
> 
>   } catch (JsonParseException e) {
> 
> 
> 
>    // TODO Auto-generated catch block
> 
> 
> 
>    throw new IOException("Failed to deserialize JSON object.");
> 
> 
> 
>   } catch (JsonMappingException e) {
> 
> 
> 
>    // TODO Auto-generated catch block
> 
> 
> 
>    throw new IOException("Failed to deserialize JSON object.");
> 
>   } catch (IOException e) {
> 
> 
> 
>    // TODO Auto-generated catch block
> 
> 
> 
>    throw new IOException("Failed to deserialize JSON object.");
> 
>   }
> 
>   return obj;
> 
>  }
> 
> }
> 
> 
> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in 
> the type AllWindowedStream<String,GlobalWindow> is not applicable for the 
> arguments (MyWindowFunction) error.
> 
> Kindly give your input.
> 
> Regards,
> Vijay Raajaa GS 
> 

Reply via email to