The default implementation returns processing timestamp of the last record
(in effect. more accurately it returns same as getTimestamp(), which might
overridden by user).

As a work around, yes, you can provide your own watermarkFn that
essentially returns Now() or Now()-1sec. (usage in javadoc
<https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L138>
)

I think default watermark should be smarter. it should advance to current
time if there aren't any records to read from Kafka. Could you file a jira?

thanks,
Raghu.

On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <sumitkcha...@gmail.com>
wrote:

> Hi All
>
>
> I am trying to do some simple batch processing on KafkaIO records.  My beam
> pipeline looks like following:
>
> pipeline.apply(KafkaIO.read()
>         .withTopics(ImmutableList.of(s"mytopic"))
>         .withBootstrapServers("localhost:9200")
> .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> KV<String,String>
>
> .apply("WindowBy10Sec", Window.<KV<String,
> JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> 10))).withAllowedLateness(Duration.standardSeconds(1)))
>
> .apply("GroupByKey", GroupByKey.create())
>
> .apply("Sink", ParDo.of(new MySink())
>
>
> My Kafka Source already has some messages 1000+, and new messages arrive
> every few minutes.
>
> When i start my pipeline,  i can see that it reads all the 1000+ messages
> from Kafka.  However, Window does not fire untill a new message arrives in
> Kafka.  And Sink does not receive any message until that point.  Do i need
> to override the WaterMarkFn here? Since i am not providing any timeStampFn
> , i am assuming that timestamps will be assigned as in when message arrives
> i.e. ingestion time.  What is the default WaterMarkFn implementation? Is
> the Window not supposed to be fired based on Ingestion time?
>
>
>
>
> Regards
> Sumit Chawla
>

Reply via email to