Hello All,

I need to seek advice whether Session Windowing followed by CoGroupByKey is
a correct way to solve my use case or not and if YES, then whether I am
using it correctly or not.
Please note that I am using java sdk 2.19 on google dataflow

I have two streams of data coming from two different kafka topics and I
need to correlate them using the common key present in both of them. I
expect all the logs for a key to arrive within 90 seconds in both topics
and hence I decided to use session window

1. Read data from kafka topic like following:

PCollection<KV<MyKey, POJO1>> collection1 =
    p

    .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()

        .withBootstrapServers(servers)

        .withTopics(Arrays.asList(“topic1”))

        .withKeyDeserializer(StringDeserializer.class)

        .withValueDeserializer(ByteArrayDeserializer.class)

        .withConsumerConfigUpdates(kafkaConsumerProperties)

        .withConsumerFactoryFn(consumerFactoryObj)

        .commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))

    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

    .withAllowedLateness(Duration.standardSeconds(360))

    .discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection<POJO>",

    ParDo.of(new ParseLogsPOJO1())));


PCollection<KV<MyKey, POJO2>> collection2 =
    p

    .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()

        .withBootstrapServers(servers)

        .withTopics(Arrays.asList(“topic2”))

        .withKeyDeserializer(StringDeserializer.class)

        .withValueDeserializer(ByteArrayDeserializer.class)

        .withConsumerConfigUpdates(kafkaConsumerProperties)

        .withConsumerFactoryFn(consumerFactoryObj)

        .commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))

    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

    .withAllowedLateness(Duration.standardSeconds(360))

    .discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection<POJO>",

    ParDo.of(new ParseLogsPOJO2())));


2. Put each of the above collections in a session window with gap period 90
secs


   PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 =

    Collection1

        .apply("Applying_Session_Window",

            Window.<KV<MyKey,
POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))


.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))

                .withAllowedLateness(Duration.ZERO).discardingFiredPanes());


     PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 =

    Collection1

        .apply("Applying_Session_Window",

            Window.<KV<MyKey,
POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90)))


.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))

                .withAllowedLateness(Duration.ZERO).discardingFiredPanes());


3. CoGroupByKey and get correlated logs


   PCollection<CorrelatedPOJO> coGbkLogs =

    KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)

        .and(“tag2”, sessionWindowedPOJO2)

        .apply("CoGroupByMyKey”, CoGroupByKey.create())

        .apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())




   Is this a correct way to solve my use case?


Looking forward to hearing from someone soon.


Thanks and Regards

Mohil

Reply via email to