Thanks for your input, will try to incorporate them in my implementation.

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> The approach could work, but if it can happen that an event from stream A
> is not matched by an event in stream B you will have lingering state that
> never goes away. For such cases it might be better to write a custom
> CoProcessFunction as sketched here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.
>
> The idea is to keep events from each side in state and emit a result when
> you get the event from the other side. You also set a cleanup timer in case
> no other event arrives to make sure that state eventually goes away.
>
> Best,
> Aljoscha
>
> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com>
> wrote:
>
> Sure. Thanks for the pointer, let me reorder the same. Any comments about
> the approach followed for merging topics and creating a single JSON?
>
> Regards,
> Vijay Raajaa G S
>
> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> An AllWindow operator requires an AllWindowFunction, instead of a
>> WindowFunction. In your case, the keyBy() seems to be in the wrong place,
>> to get a keyed window you have to write something akin to:
>>
>> inputStream
>>   .keyBy(…)
>>   .window(…)
>>   .apply(…) // or reduce()
>>
>> In your case, you key the stream and then the keying is “lost” again
>> because you apply a flatMap(). That’s why you have an all-window and not a
>> keyed window.
>>
>> Best,
>> Aljoscha
>>
>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I am trying to combine two kafka topics using the a single kafka consumer
>> on a list of topics, further convert the json string in the stream to POJO.
>> Then, join them via keyBy ( On event time field ) and to merge them as a
>> single fat json, I was planning to use a window stream and apply a window
>> function on the window stream. The assumption is that Topic-A & Topic-B can
>> be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B
>> (JSON ) will be present with the same eventTime. Hence was planning to use
>> a coutWindow(2) post keyBy on eventTime.
>>
>> I have couple of questions for the same;
>>
>> 1. Is the approach fine for merging topics and creating a single JSON?
>> 2. The window function on All Window stream doesnt seem to work fine; Any
>> pointers will be greatly appreciated.
>>
>> Code Snippet :
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>
>> logger.info("Flink Stream Window Charger has started");
>>
>> Properties properties = new Properties();
>>
>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030");
>>
>> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka
>> ");
>>
>> properties.setProperty("group.id", "group-0011");
>>
>> properties.setProperty("auto.offset.reset", "smallest");
>>
>>
>> List < String > names = new ArrayList < > ();
>>
>>
>> names.add("Topic-A");
>>
>> names.add("Topic-B");
>>
>>
>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 <
>> > (names, new SimpleStringSchema(), properties));
>>
>> DataStream < TopicPojo > pojo = stream.map(new
>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
>>
>> List < String > where = new ArrayList < String > ();
>>
>> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new
>> Tokenizer()).countWindowAll(2);
>>
>> DataStream < String > data_charging = data_window.apply(new
>> MyWindowFunction());
>>
>> data_charging.addSink(new SinkFunction < String > () {
>>
>>
>> public void invoke(String value) throws Exception {
>>
>>
>>   // Yet to be implemented - Merge two POJO into one
>>
>>  }
>>
>> });
>>
>>
>> try
>>
>> {
>>
>>  env.execute();
>>
>> } catch (Exception e)
>>
>> {
>>
>>  return;
>>
>> }
>>
>> }
>>
>> }
>>
>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>>
>>  private static final long serialVersionUID = 1 L;
>>
>>  @Override
>>
>>  public void flatMap(TopicPojo value, Collector < String > out) throws
>> Exception {
>>
>>   ObjectMapper mapper = new ObjectMapper();
>>
>>   out.collect(mapper.writeValueAsString(value));
>>
>>  }
>>
>> }
>>
>> class MyWindowFunction implements WindowFunction < TopicPojo, String,
>> String, GlobalWindow > {
>>
>>  @Override
>>
>>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo
>> > arg2, Collector < String > out)
>>
>>  throws Exception {
>>
>>   int count = 0;
>>
>>   for (TopicPojo in : arg2) {
>>
>>    count++;
>>
>>   }
>>
>>   // Test Result - TO be modified
>>
>>   out.collect("Window: " + window + "count: " + count);
>>
>>
>>  }
>>
>> }
>>
>> class Deserializer implements MapFunction < String, TopicPojo > {
>>
>>  private static final long serialVersionUID = 1 L;
>>
>>  @Override
>>
>>  public TopicPojo map(String value) throws IOException {
>>
>>   // TODO Auto-generated method stub
>>
>>   ObjectMapper mapper = new ObjectMapper();
>>
>>   TopicPojo obj = null;
>>
>>   try {
>>
>>
>>    System.out.println(value);
>>
>>
>>    obj = mapper.readValue(value, TopicPojo.class);
>>
>>
>>   } catch (JsonParseException e) {
>>
>>
>>    // TODO Auto-generated catch block
>>
>>
>>    throw new IOException("Failed to deserialize JSON object.");
>>
>>
>>   } catch (JsonMappingException e) {
>>
>>
>>    // TODO Auto-generated catch block
>>
>>
>>    throw new IOException("Failed to deserialize JSON object.");
>>
>>   } catch (IOException e) {
>>
>>
>>    // TODO Auto-generated catch block
>>
>>
>>    throw new IOException("Failed to deserialize JSON object.");
>>
>>   }
>>
>>   return obj;
>>
>>  }
>>
>> }
>>
>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>)
>> in the type AllWindowedStream<String,GlobalWindow> is not applicable for
>> the arguments (MyWindowFunction) error.
>>
>> Kindly give your input.
>>
>> Regards,
>> Vijay Raajaa GS
>>
>>
>>
>
>

Reply via email to