Hi, An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:
inputStream .keyBy(…) .window(…) .apply(…) // or reduce() In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window. Best, Aljoscha > On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote: > > Hi, > > I am trying to combine two kafka topics using the a single kafka consumer on > a list of topics, further convert the json string in the stream to POJO. > Then, join them via keyBy ( On event time field ) and to merge them as a > single fat json, I was planning to use a window stream and apply a window > function on the window stream. The assumption is that Topic-A & Topic-B can > be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON > ) will be present with the same eventTime. Hence was planning to use a > coutWindow(2) post keyBy on eventTime. > > I have couple of questions for the same; > > 1. Is the approach fine for merging topics and creating a single JSON? > 2. The window function on All Window stream doesnt seem to work fine; Any > pointers will be greatly appreciated. > > Code Snippet : > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > logger.info <http://logger.info/>("Flink Stream Window Charger has started"); > > Properties properties = new Properties(); > > properties.setProperty("bootstrap.servers", "127.0.0.1:1030 > <http://127.0.0.1:1030/>"); > > properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka > <http://127.0.0.1:2181/service-kafka>"); > > properties.setProperty("group.id <http://group.id/>", "group-0011"); > > properties.setProperty("auto.offset.reset", "smallest"); > > > > List < String > names = new ArrayList < > (); > > > > names.add("Topic-A"); > > names.add("Topic-B"); > > > > DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > > (names, new SimpleStringSchema(), properties)); > > DataStream < TopicPojo > pojo = stream.map(new > Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); > > List < String > where = new ArrayList < String > (); > > AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new > Tokenizer()).countWindowAll(2); > > DataStream < String > data_charging = data_window.apply(new > MyWindowFunction()); > > data_charging.addSink(new SinkFunction < String > () { > > > > public void invoke(String value) throws Exception { > > > > // Yet to be implemented - Merge two POJO into one > > } > > }); > > > > try > > { > > env.execute(); > > } catch (Exception e) > > { > > return; > > } > > } > > } > > class Tokenizer implements FlatMapFunction < TopicPojo, String > { > > private static final long serialVersionUID = 1 L; > > @Override > > public void flatMap(TopicPojo value, Collector < String > out) throws > Exception { > > ObjectMapper mapper = new ObjectMapper(); > > out.collect(mapper.writeValueAsString(value)); > > } > > } > > class MyWindowFunction implements WindowFunction < TopicPojo, String, String, > GlobalWindow > { > > @Override > > public void apply(String key, GlobalWindow window, Iterable < TopicPojo > > arg2, Collector < String > out) > > throws Exception { > > int count = 0; > > for (TopicPojo in : arg2) { > > count++; > > } > > // Test Result - TO be modified > > out.collect("Window: " + window + "count: " + count); > > > > } > > } > > class Deserializer implements MapFunction < String, TopicPojo > { > > private static final long serialVersionUID = 1 L; > > @Override > > public TopicPojo map(String value) throws IOException { > > // TODO Auto-generated method stub > > ObjectMapper mapper = new ObjectMapper(); > > TopicPojo obj = null; > > try { > > > > System.out.println(value); > > > > obj = mapper.readValue(value, TopicPojo.class); > > > > } catch (JsonParseException e) { > > > > // TODO Auto-generated catch block > > > > throw new IOException("Failed to deserialize JSON object."); > > > > } catch (JsonMappingException e) { > > > > // TODO Auto-generated catch block > > > > throw new IOException("Failed to deserialize JSON object."); > > } catch (IOException e) { > > > > // TODO Auto-generated catch block > > > > throw new IOException("Failed to deserialize JSON object."); > > } > > return obj; > > } > > } > > > I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in > the type AllWindowedStream<String,GlobalWindow> is not applicable for the > arguments (MyWindowFunction) error. > > Kindly give your input. > > Regards, > Vijay Raajaa GS >