Hi, I am exploring sessions windowing in apache beam. I have created a pipeline to know the window start time and window end time of the elements emitted from GroupByKey, which groups elements to which sessions window was applied. I got the Exception: Exception in thread "main" java.lang.IllegalArgumentException: public void main.SlidingWindowX$2.processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow) unable to provide window -- expected window type from parameter (org.apache.beam.sdk.transforms.windowing.IntervalWindow) is not a supertype of actual window type assigned by windowing (W)
I came to know that when GroupByKey is applied on a PCollection to which sessions window is applied, the output PCollection emitted from GroupByKey doesn't have any WindowingStrategy. Is this Correct? What should be the window type of such elements? How can I know the start time and end time of the Session window(merged windows)? If I apply a GroupByKey again on the result from 1st GroupByKey, I get the Exception: Exception in thread "main" java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey So, should a new window be applied, before I do this? I am obviously missing a crucial point regarding the output PCollection emitted from Aggregate Transforms. Can you point me to any documentation (or) code that explains this behaviour? After going through the code, I understood that, when Sessions window is applied, each element is assigned an Interval Window with start_time = element_timestamp, end_time = element_timestamp + gap. And when an Aggregation Transform is called, MergeContext is Created for each Key and the windows are merged. Is it possible to get the MergeContext after applying Aggregation Transforms? Can you point me to the design doc for this behaviour? Pipeline Steps: 1) Read from Kafka using KafkaIO PCollection<KafkaRecord<String, String>> pcollectIO = p.apply(KafkaIO.<String, String>read() .withBootstrapServers("localhost:9092").withTopic("sessionWindowtopic") .withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class) 2) Convert KafkaRecord to KV PCollection<KV<String, String>> pcollectKV = pcollectIO .apply(ParDo.of(new DoFn<KafkaRecord<String, String>, KV<String, String>>() { @ProcessElement public void processElement(@Element KafkaRecord<String, String> record, OutputReceiver<KV<String, String>> r) { r.output(record.getKV()); } })); 3) Apply sessions window PCollection<KV<String, String>> pcollectW2 = pcollectKV .apply(Window.<KV<String, String>>into( Sessions.withGapDuration(Duration.standardSeconds(5)))); 4) 1st GroupByKey PCollection<KV<String, Iterable<String>>> pcollectAfterGroupBy = pcollectW2.apply(GroupByKey.<String, String>create()); 5) ParDo to output the window start time and the window end time of each element of the PCollection emitted from GroupByKey. pcollectAfterGroupBy.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() { @ProcessElement public void processElement(ProcessContext c, IntervalWindow b) { System.out.println(b.start()); System.out.println(b.end()); } 6) 2nd GroupByKey (used just for exploring sessions window) PCollection<KV<String, Iterable<Iterable<String>>>> pcollectCascaded = pcollectAfterGroupBy.apply(GroupByKey.<String, Iterable<String>>create()); Thanks, Rahul