Hi All
I am trying to do some simple batch processing on KafkaIO records. My beam pipeline looks like following: pipeline.apply(KafkaIO.read() .withTopics(ImmutableList.of(s"mytopic")) .withBootstrapServers("localhost:9200") .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a KV<String,String> .apply("WindowBy10Sec", Window.<KV<String, JSONObject>>into(FixedWindows.of(Duration.standardSeconds(10))).withAllowedLateness(Duration.standardSeconds(1))) .apply("GroupByKey", GroupByKey.create()) .apply("Sink", ParDo.of(new MySink()) My Kafka Source already has some messages 1000+, and new messages arrive every few minutes. When i start my pipeline, i can see that it reads all the 1000+ messages from Kafka. However, Window does not fire untill a new message arrives in Kafka. And Sink does not receive any message until that point. Do i need to override the WaterMarkFn here? Since i am not providing any timeStampFn , i am assuming that timestamps will be assigned as in when message arrives i.e. ingestion time. What is the default WaterMarkFn implementation? Is the Window not supposed to be fired based on Ingestion time? Regards Sumit Chawla