Hey Sumit;

What runner are you using? I can set up a test with the same trigger
reading from an unbounded input using the DirectRunner and I get the
expected output panes.

Just to clarify, the second half of the trigger ('when the first element
has been there for at least 30+ seconds') simply never fires?

On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <sumitkcha...@gmail.com>
wrote:

> Hi Thomas
>
> That did not work.
>
> I tried following instead:
>
> .triggering(
>         Repeatedly.forever(
>                 AfterFirst.of(
>                               AfterProcessingTime.pastFirstElementInPane()
>                                 .plusDelayOf(Duration.standard
> Seconds(30)),
>                               AfterPane.elementCountAtLeast(100)
>                         )))
> .discardingFiredPanes()
>
> What i am trying to do here.  This is to make sure that followup
> operations receive batches of records.
>
> 1.  Fire when at Pane has 100+ elements
>
> 2.  Or Fire when the first element has been there for atleast 30 sec+.
>
> However,  2 point does not seem to work.  e.g. I have 540 records in
> Kafka.  The first 500 records are available immediately,
>
> but the remaining 40 don't pass through. I was expecting 2nd to
> trigger to help here.
>
>
>
>
>
>
>
> Regards
> Sumit Chawla
>
>
> On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > You can adjust the trigger in the windowing transform if your sink can
> > handle being written to multiple times for the same window. For example,
> if
> > the sink appends to the output when it receives new data in a window, you
> > could add something like
> >
> > Window.into(...).withAllowedLateness(...).triggering(AfterWatermark.
> > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > pastFirstElementInPane().withDelayOf(Duration.standardSeconds(5))).
> > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> gFiredPanes();
> >
> > This will cause elements to be output some amount of time after they are
> > first received from Kafka, even if Kafka does not have any new elements.
> > Elements will only be output by the GroupByKey once.
> >
> > We should still have a JIRA to improve the KafkaIO watermark tracking in
> > the absence of new records .
> >
> > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> >
> > > Thanks Raghu.
> > >
> > > I don't have much control over changing KafkaIO properties.  I added
> > > KafkaIO code for completing the example.  Are there any changes that
> can
> > be
> > > done to Windowing to achieve the same behavior?
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> <rang...@google.com.invalid
> > >
> > > wrote:
> > >
> > > > The default implementation returns processing timestamp of the last
> > > record
> > > > (in effect. more accurately it returns same as getTimestamp(), which
> > > might
> > > > overridden by user).
> > > >
> > > > As a work around, yes, you can provide your own watermarkFn that
> > > > essentially returns Now() or Now()-1sec. (usage in javadoc
> > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > kafka/KafkaIO.java#L138>
> > > > )
> > > >
> > > > I think default watermark should be smarter. it should advance to
> > current
> > > > time if there aren't any records to read from Kafka. Could you file a
> > > jira?
> > > >
> > > > thanks,
> > > > Raghu.
> > > >
> > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> sumitkcha...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi All
> > > > >
> > > > >
> > > > > I am trying to do some simple batch processing on KafkaIO records.
> > My
> > > > beam
> > > > > pipeline looks like following:
> > > > >
> > > > > pipeline.apply(KafkaIO.read()
> > > > >         .withTopics(ImmutableList.of(s"mytopic"))
> > > > >         .withBootstrapServers("localhost:9200")
> > > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) //
> Emits a
> > > > > KV<String,String>
> > > > >
> > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds(
> > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > >
> > > > > .apply("GroupByKey", GroupByKey.create())
> > > > >
> > > > > .apply("Sink", ParDo.of(new MySink())
> > > > >
> > > > >
> > > > > My Kafka Source already has some messages 1000+, and new messages
> > > arrive
> > > > > every few minutes.
> > > > >
> > > > > When i start my pipeline,  i can see that it reads all the 1000+
> > > messages
> > > > > from Kafka.  However, Window does not fire untill a new message
> > arrives
> > > > in
> > > > > Kafka.  And Sink does not receive any message until that point.
> Do i
> > > > need
> > > > > to override the WaterMarkFn here? Since i am not providing any
> > > > timeStampFn
> > > > > , i am assuming that timestamps will be assigned as in when message
> > > > arrives
> > > > > i.e. ingestion time.  What is the default WaterMarkFn
> implementation?
> > > Is
> > > > > the Window not supposed to be fired based on Ingestion time?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > >
> > >
> >
>

Reply via email to