The default implementation returns processing timestamp of the last record (in effect. more accurately it returns same as getTimestamp(), which might overridden by user).
As a work around, yes, you can provide your own watermarkFn that essentially returns Now() or Now()-1sec. (usage in javadoc <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L138> ) I think default watermark should be smarter. it should advance to current time if there aren't any records to read from Kafka. Could you file a jira? thanks, Raghu. On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <sumitkcha...@gmail.com> wrote: > Hi All > > > I am trying to do some simple batch processing on KafkaIO records. My beam > pipeline looks like following: > > pipeline.apply(KafkaIO.read() > .withTopics(ImmutableList.of(s"mytopic")) > .withBootstrapServers("localhost:9200") > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a > KV<String,String> > > .apply("WindowBy10Sec", Window.<KV<String, > JSONObject>>into(FixedWindows.of(Duration.standardSeconds( > 10))).withAllowedLateness(Duration.standardSeconds(1))) > > .apply("GroupByKey", GroupByKey.create()) > > .apply("Sink", ParDo.of(new MySink()) > > > My Kafka Source already has some messages 1000+, and new messages arrive > every few minutes. > > When i start my pipeline, i can see that it reads all the 1000+ messages > from Kafka. However, Window does not fire untill a new message arrives in > Kafka. And Sink does not receive any message until that point. Do i need > to override the WaterMarkFn here? Since i am not providing any timeStampFn > , i am assuming that timestamps will be assigned as in when message arrives > i.e. ingestion time. What is the default WaterMarkFn implementation? Is > the Window not supposed to be fired based on Ingestion time? > > > > > Regards > Sumit Chawla >