Hello All, I need to seek advice whether Session Windowing followed by CoGroupByKey is a correct way to solve my use case or not and if YES, then whether I am using it correctly or not. Please note that I am using java sdk 2.19 on google dataflow
I have two streams of data coming from two different kafka topics and I need to correlate them using the common key present in both of them. I expect all the logs for a key to arrive within 90 seconds in both topics and hence I decided to use session window 1. Read data from kafka topic like following: PCollection<KV<MyKey, POJO1>> collection1 = p .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read() .withBootstrapServers(servers) .withTopics(Arrays.asList(“topic1”)) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withConsumerConfigUpdates(kafkaConsumerProperties) .withConsumerFactoryFn(consumerFactoryObj) .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .withAllowedLateness(Duration.standardSeconds(360)) .discardingFiredPanes()) .apply("Convert_KafkaRecord_To_PCollection<POJO>", ParDo.of(new ParseLogsPOJO1()))); PCollection<KV<MyKey, POJO2>> collection2 = p .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read() .withBootstrapServers(servers) .withTopics(Arrays.asList(“topic2”)) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withConsumerConfigUpdates(kafkaConsumerProperties) .withConsumerFactoryFn(consumerFactoryObj) .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .withAllowedLateness(Duration.standardSeconds(360)) .discardingFiredPanes()) .apply("Convert_KafkaRecord_To_PCollection<POJO>", ParDo.of(new ParseLogsPOJO2()))); 2. Put each of the above collections in a session window with gap period 90 secs PCollection<KV<MyKey, POJO1>> sessionWindowedPOJO1 = Collection1 .apply("Applying_Session_Window", Window.<KV<MyKey, POJO1>>into(Sessions.withGapDuration(Duration.standardSeconds(90))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) .withAllowedLateness(Duration.ZERO).discardingFiredPanes()); PCollection<KV<MyKey, POJO2>> sessionWindowedPOJO2 = Collection1 .apply("Applying_Session_Window", Window.<KV<MyKey, POJO2>>into(Sessions.withGapDuration(Duration.standardSeconds(90))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) .withAllowedLateness(Duration.ZERO).discardingFiredPanes()); 3. CoGroupByKey and get correlated logs PCollection<CorrelatedPOJO> coGbkLogs = KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1) .and(“tag2”, sessionWindowedPOJO2) .apply("CoGroupByMyKey”, CoGroupByKey.create()) .apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate()) Is this a correct way to solve my use case? Looking forward to hearing from someone soon. Thanks and Regards Mohil