You can adjust the trigger in the windowing transform if your sink can
handle being written to multiple times for the same window. For example, if
the sink appends to the output when it receives new data in a window, you
could add something like

Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).withLateFirings(AfterPane.elementCountAtLeast(1))).discardingFiredPanes();

This will cause elements to be output some amount of time after they are
first received from Kafka, even if Kafka does not have any new elements.
Elements will only be output by the GroupByKey once.

We should still have a JIRA to improve the KafkaIO watermark tracking in
the absence of new records .

On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <sumitkcha...@gmail.com>
wrote:

> Thanks Raghu.
>
> I don't have much control over changing KafkaIO properties.  I added
> KafkaIO code for completing the example.  Are there any changes that can be
> done to Windowing to achieve the same behavior?
>
> Regards
> Sumit Chawla
>
>
> On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi <rang...@google.com.invalid>
> wrote:
>
> > The default implementation returns processing timestamp of the last
> record
> > (in effect. more accurately it returns same as getTimestamp(), which
> might
> > overridden by user).
> >
> > As a work around, yes, you can provide your own watermarkFn that
> > essentially returns Now() or Now()-1sec. (usage in javadoc
> > <https://github.com/apache/incubator-beam/blob/master/
> > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > kafka/KafkaIO.java#L138>
> > )
> >
> > I think default watermark should be smarter. it should advance to current
> > time if there aren't any records to read from Kafka. Could you file a
> jira?
> >
> > thanks,
> > Raghu.
> >
> > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> >
> > > Hi All
> > >
> > >
> > > I am trying to do some simple batch processing on KafkaIO records.  My
> > beam
> > > pipeline looks like following:
> > >
> > > pipeline.apply(KafkaIO.read()
> > >         .withTopics(ImmutableList.of(s"mytopic"))
> > >         .withBootstrapServers("localhost:9200")
> > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // Emits a
> > > KV<String,String>
> > >
> > > .apply("WindowBy10Sec", Window.<KV<String,
> > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > >
> > > .apply("GroupByKey", GroupByKey.create())
> > >
> > > .apply("Sink", ParDo.of(new MySink())
> > >
> > >
> > > My Kafka Source already has some messages 1000+, and new messages
> arrive
> > > every few minutes.
> > >
> > > When i start my pipeline,  i can see that it reads all the 1000+
> messages
> > > from Kafka.  However, Window does not fire untill a new message arrives
> > in
> > > Kafka.  And Sink does not receive any message until that point.  Do i
> > need
> > > to override the WaterMarkFn here? Since i am not providing any
> > timeStampFn
> > > , i am assuming that timestamps will be assigned as in when message
> > arrives
> > > i.e. ingestion time.  What is the default WaterMarkFn implementation?
> Is
> > > the Window not supposed to be fired based on Ingestion time?
> > >
> > >
> > >
> > >
> > > Regards
> > > Sumit Chawla
> > >
> >
>

Reply via email to