Thanks Raghu.

I don't have much control over changing KafkaIO properties.  I added
KafkaIO code for completing the example.  Are there any changes that can be
done to Windowing to achieve the same behavior?

Regards
Sumit Chawla


On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi <rang...@google.com.invalid>
wrote:

> The default implementation returns processing timestamp of the last record
> (in effect. more accurately it returns same as getTimestamp(), which might
> overridden by user).
>
> As a work around, yes, you can provide your own watermarkFn that
> essentially returns Now() or Now()-1sec. (usage in javadoc
> <https://github.com/apache/incubator-beam/blob/master/
> sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> kafka/KafkaIO.java#L138>
> )
>
> I think default watermark should be smarter. it should advance to current
> time if there aren't any records to read from Kafka. Could you file a jira?
>
> thanks,
> Raghu.
>
> On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
> > Hi All
> >
> >
> > I am trying to do some simple batch processing on KafkaIO records.  My
> beam
> > pipeline looks like following:
> >
> > pipeline.apply(KafkaIO.read()
> >         .withTopics(ImmutableList.of(s"mytopic"))
> >         .withBootstrapServers("localhost:9200")
> > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> > KV<String,String>
> >
> > .apply("WindowBy10Sec", Window.<KV<String,
> > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> >
> > .apply("GroupByKey", GroupByKey.create())
> >
> > .apply("Sink", ParDo.of(new MySink())
> >
> >
> > My Kafka Source already has some messages 1000+, and new messages arrive
> > every few minutes.
> >
> > When i start my pipeline,  i can see that it reads all the 1000+ messages
> > from Kafka.  However, Window does not fire untill a new message arrives
> in
> > Kafka.  And Sink does not receive any message until that point.  Do i
> need
> > to override the WaterMarkFn here? Since i am not providing any
> timeStampFn
> > , i am assuming that timestamps will be assigned as in when message
> arrives
> > i.e. ingestion time.  What is the default WaterMarkFn implementation? Is
> > the Window not supposed to be fired based on Ingestion time?
> >
> >
> >
> >
> > Regards
> > Sumit Chawla
> >
>

Reply via email to