Hi,

I am exploring sessions windowing in apache beam. I have created a pipeline
to know the window start time and window end time of the elements emitted
from GroupByKey, which groups elements to which sessions window was
applied.
I got the Exception:
              Exception in thread "main"
java.lang.IllegalArgumentException: public void
main.SlidingWindowX$2.processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow)
unable to provide window -- expected window type from parameter
(org.apache.beam.sdk.transforms.windowing.IntervalWindow) is not a
supertype of actual window type assigned by windowing (W)

I came to know that when GroupByKey is applied on a PCollection to which
sessions window is applied, the output PCollection emitted from GroupByKey
doesn't have any WindowingStrategy. Is this Correct?

What should be the window type of such elements? How can I know the start
time and end time of the Session window(merged windows)?

If I apply a GroupByKey again on the result from 1st GroupByKey, I get the
Exception:
               Exception in thread "main" java.lang.IllegalStateException:
GroupByKey must have a valid Window merge function.  Invalid because:
WindowFn has already been consumed by previous GroupByKey
So, should a new window be applied, before I do this?

I am obviously missing a crucial point regarding the output PCollection
emitted from Aggregate Transforms. Can you point me to any documentation
(or) code that explains this behaviour?

After going through the code, I understood that, when Sessions window is
applied, each element is assigned an Interval Window with start_time =
element_timestamp, end_time = element_timestamp + gap. And when an
Aggregation Transform is called, MergeContext is Created for each Key and
the windows are merged. Is it possible to get the MergeContext after
applying Aggregation Transforms?

Can you point me to the design doc for this behaviour?

Pipeline Steps:

1) Read from Kafka using KafkaIO
        PCollection<KafkaRecord<String, String>> pcollectIO =
p.apply(KafkaIO.<String, String>read()

.withBootstrapServers("localhost:9092").withTopic("sessionWindowtopic")

.withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)

2) Convert KafkaRecord to KV
        PCollection<KV<String, String>> pcollectKV = pcollectIO
                .apply(ParDo.of(new DoFn<KafkaRecord<String, String>,
KV<String, String>>() {
                    @ProcessElement
                    public void processElement(@Element KafkaRecord<String,
String> record,
                            OutputReceiver<KV<String, String>> r) {
                        r.output(record.getKV());
                    }
                }));

3) Apply sessions window
        PCollection<KV<String, String>> pcollectW2 = pcollectKV
                .apply(Window.<KV<String, String>>into(

Sessions.withGapDuration(Duration.standardSeconds(5))));

4) 1st GroupByKey
        PCollection<KV<String, Iterable<String>>> pcollectAfterGroupBy =
pcollectW2.apply(GroupByKey.<String, String>create());

5) ParDo to output the window start time and the window end time of each
element of the PCollection emitted from GroupByKey.
        pcollectAfterGroupBy.apply(ParDo.of(new DoFn<KV<String,
Iterable<String>>, Void>() {
            @ProcessElement
            public void processElement(ProcessContext c, IntervalWindow b) {
                System.out.println(b.start());
                System.out.println(b.end());
                }

6) 2nd GroupByKey (used just for exploring sessions window)
                 PCollection<KV<String, Iterable<Iterable<String>>>>
pcollectCascaded = pcollectAfterGroupBy.apply(GroupByKey.<String,
Iterable<String>>create());


Thanks,
Rahul

Reply via email to