Thanks Ajioscha\Thomas I will explore on the option to upgrade. Meanwhile here is what observed with the above code in my local Flink Cluster.
1. To start there are 0 records in Kafka 2. Deploy the pipeline. Two records are received in Kafka at time 10:00:00 AM 3. The Pane with 100 records would not fire because expected data is not there. I would expect the 30 sec based filter to fire and downstream to receive the record around 10:00:30 AM. 4. No new records are arriving. The downstream received the above record around 10 minutes later around 10:10:00 AM I am not sure whats actually triggering the window firing here. ( does not look like to be 30 sec trigger) Regards Sumit Chawla On Wed, Aug 31, 2016 at 11:14 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Ah I see, the Flink Runner had quite some updates in 0.2.0-incubating and > even more for the upcoming 0.3.0-incubating. > > On Thu, 1 Sep 2016 at 04:09 Thomas Groh <tg...@google.com.invalid> wrote: > > > In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner > with > > the DirectRunner (formerly InProcessPipelineRunner), which is capable of > > handling Unbounded Pipelines. Is it possible for you to upgrade? > > > > On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <sumitkcha...@gmail.com> > > wrote: > > > > > @Ajioscha, My assumption is here that atleast one trigger should fire. > > > Either the 100 elements or the 30 second since first element. > (whichever > > > happens first) > > > > > > @Thomas - here is the error i get: I am using 0.1.0-incubating > > > > > > *ava.lang.IllegalStateException: no evaluator registered for > > > Read(UnboundedKafkaSource)* > > > > > > * at > > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator. > > > visitPrimitiveTransform(DirectPipelineRunner.java:890)* > > > * at > > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > > TransformTreeNode.java:225)* > > > * at > > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > > TransformTreeNode.java:220)* > > > * at > > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > > TransformTreeNode.java:220)* > > > * a* > > > > > > Regards > > > Sumit Chawla > > > > > > > > > On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek < > aljos...@apache.org> > > > wrote: > > > > > > > Hi, > > > > could the reason for the second part of the trigger never firing be > > that > > > > there are never at least 100 elements per key. The trigger would only > > > fire > > > > if it saw 100 elements and with only 540 elements that seems unlikely > > if > > > > you have more than 6 keys. > > > > > > > > Cheers, > > > > Aljoscha > > > > > > > > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid> > > > wrote: > > > > > > > > > KafkaIO is implemented using the UnboundedRead API, which is > > supported > > > by > > > > > the DirectRunner. You should be able to run without the > > > > withMaxNumRecords; > > > > > if you can't, I'd be very interested to see the stack trace that > you > > > get > > > > > when you try to run the Pipeline. > > > > > > > > > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit < > > sumitkcha...@gmail.com > > > > > > > > > wrote: > > > > > > > > > > > Yes. I added it only for DirectRunner as it cannot translate > > > > > > Read(UnboundedSourceOfKafka) > > > > > > > > > > > > Regards > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek < > > > > aljos...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > Ah ok, this might be a stupid question but did you remove this > > line > > > > > when > > > > > > > running it with Flink: > > > > > > > .withMaxNumRecords(500) > > > > > > > > > > > > > > Cheers, > > > > > > > Aljoscha > > > > > > > > > > > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit < > > sumitkcha...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hi Aljoscha > > > > > > > > > > > > > > > > The code is not different while running on Flink. It have > > > removed > > > > > > > business > > > > > > > > specific transformations only. > > > > > > > > > > > > > > > > Regards > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek < > > > > > aljos...@apache.org > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > could you maybe also post the complete that you're using > with > > > the > > > > > > > > > FlinkRunner? I could have a look into it. > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit < > > > > sumitkcha...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > > > > > > > > > Sorry i tried with DirectRunner but ran into some kafka > > > issues. > > > > > > > > > Following > > > > > > > > > > is the snippet i am working on, and will post more > details > > > > once i > > > > > > get > > > > > > > > it > > > > > > > > > > working ( as of now i am unable to read messages from > Kafka > > > > using > > > > > > > > > > DirectRunner) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > PipelineOptions pipelineOptions = > > > > > PipelineOptionsFactory.create(); > > > > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class); > > > > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions); > > > > > > > > > > pipeline.apply(KafkaIO.read() > > > > > > > > > > .withMaxNumRecords(500) > > > > > > > > > > .withTopics(ImmutableList.of("mytopic")) > > > > > > > > > > .withBootstrapServers("localhost:9092") > > > > > > > > > > .updateConsumerProperties(ImmutableMap.of( > > > > > > > > > > ConsumerConfig.GROUP_ID_CONFIG, "test1", > > > > > > > > > > ConsumerConfig.ENABLE_AUTO_ > COMMIT_CONFIG, > > > > "true", > > > > > > > > > > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > > > > > "earliest" > > > > > > > > > > ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], > > > > byte[]>, > > > > > > > > > > KV<String, String>>() { > > > > > > > > > > @Override > > > > > > > > > > public void processElement(ProcessContext c) throws > > > > > Exception { > > > > > > > > > > KV<byte[], byte[]> record = c.element().getKV(); > > > > > > > > > > c.output(KV.of(new String(record.getKey()), new > > > > > > > > > > String(record.getValue()))); > > > > > > > > > > } > > > > > > > > > > })) > > > > > > > > > > .apply("WindowByMinute", Window.<KV<String, > > > > > > > > > > String>>into(FixedWindows.of( > Duration.standardSeconds(10))) > > > > > > > > > > .withAllowedLateness(Duration. > > > > standardSeconds(1)) > > > > > > > > > > .triggering( > > > > > > > > > > Repeatedly.forever( > > > > > > > > > > AfterFirst.of( > > > > > > > > > > > > > > > > > > > > AfterProcessingTime.pastFirstElementInPane() > > > > > > > > > > > > > > > > > > > > .plusDelayOf(Duration.standardSeconds(30)), > > > > > > > > > > > > > > > > > AfterPane.elementCountAtLeast( > > > > > > > > > 100) > > > > > > > > > > ))) > > > > > > > > > > .discardingFiredPanes()) > > > > > > > > > > .apply("GroupByTenant", GroupByKey.create()) > > > > > > > > > > .apply(ParDo.of(new DoFn<KV<String, > > > Iterable<String>>, > > > > > > > Void>() > > > > > > > > { > > > > > > > > > > @Override > > > > > > > > > > public void processElement(ProcessContext c) > > > throws > > > > > > > > > Exception { > > > > > > > > > > KV<String, Iterable<String>> element = > > > > > c.element(); > > > > > > > > > > Iterator<String> iterator = > > > > > > > > > element.getValue().iterator(); > > > > > > > > > > int count = 0; > > > > > > > > > > while (iterator.hasNext()) { > > > > > > > > > > iterator.next(); > > > > > > > > > > count++; > > > > > > > > > > } > > > > > > > > > > System.out.println(String.format("Key %s > > > Value > > > > > > Count > > > > > > > > > > %d", element.getKey(), count)); > > > > > > > > > > } > > > > > > > > > > })); > > > > > > > > > > pipeline.run(); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh > > > > > > > <tg...@google.com.invalid > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > If you use the DirectRunner, do you observe the same > > > > behavior? > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit < > > > > > > > > sumitkcha...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > > > > > > > > > > > > > I am using FlinkRunner. Yes the second part of > trigger > > > > never > > > > > > > fires > > > > > > > > > for > > > > > > > > > > > me, > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh > > > > > > > > > <tg...@google.com.invalid > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hey Sumit; > > > > > > > > > > > > > > > > > > > > > > > > > > What runner are you using? I can set up a test with > > the > > > > > same > > > > > > > > > trigger > > > > > > > > > > > > > reading from an unbounded input using the > > DirectRunner > > > > and > > > > > I > > > > > > > get > > > > > > > > > the > > > > > > > > > > > > > expected output panes. > > > > > > > > > > > > > > > > > > > > > > > > > > Just to clarify, the second half of the trigger > > ('when > > > > the > > > > > > > first > > > > > > > > > > > element > > > > > > > > > > > > > has been there for at least 30+ seconds') simply > > never > > > > > fires? > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit < > > > > > > > > > > sumitkcha...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > > > > > > > > > > > > > > > > > That did not work. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I tried following instead: > > > > > > > > > > > > > > > > > > > > > > > > > > > > .triggering( > > > > > > > > > > > > > > Repeatedly.forever( > > > > > > > > > > > > > > AfterFirst.of( > > > > > > > > > > > > > > > AfterProcessingTime. > > > > > > > > > > > > > pastFirstElementInPane() > > > > > > > > > > > > > > > > > > > > > .plusDelayOf(Duration.standard > > > > > > > > > > > > > > Seconds(30)), > > > > > > > > > > > > > > > > > > > > > > AfterPane.elementCountAtLeast(100) > > > > > > > > > > > > > > ))) > > > > > > > > > > > > > > .discardingFiredPanes() > > > > > > > > > > > > > > > > > > > > > > > > > > > > What i am trying to do here. This is to make > sure > > > that > > > > > > > > followup > > > > > > > > > > > > > > operations receive batches of records. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Fire when at Pane has 100+ elements > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. Or Fire when the first element has been there > > for > > > > > > atleast > > > > > > > > 30 > > > > > > > > > > > sec+. > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, 2 point does not seem to work. e.g. I > > have > > > > 540 > > > > > > > > records > > > > > > > > > > in > > > > > > > > > > > > > > Kafka. The first 500 records are available > > > > immediately, > > > > > > > > > > > > > > > > > > > > > > > > > > > > but the remaining 40 don't pass through. I was > > > > expecting > > > > > > 2nd > > > > > > > to > > > > > > > > > > > > > > trigger to help here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh > > > > > > > > > > > <tg...@google.com.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > You can adjust the trigger in the windowing > > > transform > > > > > if > > > > > > > your > > > > > > > > > > sink > > > > > > > > > > > > can > > > > > > > > > > > > > > > handle being written to multiple times for the > > same > > > > > > window. > > > > > > > > For > > > > > > > > > > > > > example, > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > the sink appends to the output when it receives > > new > > > > > data > > > > > > > in a > > > > > > > > > > > window, > > > > > > > > > > > > > you > > > > > > > > > > > > > > > could add something like > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Window.into(...).withAllowedLateness(...). > > > > > > > > > > > triggering(AfterWatermark. > > > > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings( > > > > AfterProcessingTime. > > > > > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration. > > > > > > > > > > > standardSeconds(5))). > > > > > > > > > > > > > > > withLateFirings(AfterPane. > > > elementCountAtLeast(1))). > > > > > > > discardin > > > > > > > > > > > > > > gFiredPanes(); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This will cause elements to be output some > amount > > > of > > > > > time > > > > > > > > after > > > > > > > > > > > they > > > > > > > > > > > > > are > > > > > > > > > > > > > > > first received from Kafka, even if Kafka does > not > > > > have > > > > > > any > > > > > > > > new > > > > > > > > > > > > > elements. > > > > > > > > > > > > > > > Elements will only be output by the GroupByKey > > > once. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We should still have a JIRA to improve the > > KafkaIO > > > > > > > watermark > > > > > > > > > > > tracking > > > > > > > > > > > > > in > > > > > > > > > > > > > > > the absence of new records . > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit > < > > > > > > > > > > > > sumitkcha...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Raghu. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I don't have much control over changing > KafkaIO > > > > > > > properties. > > > > > > > > > I > > > > > > > > > > > > added > > > > > > > > > > > > > > > > KafkaIO code for completing the example. Are > > > there > > > > > any > > > > > > > > > changes > > > > > > > > > > > > that > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > done to Windowing to achieve the same > behavior? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi > > > > > > > > > > > > > > <rang...@google.com.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The default implementation returns > processing > > > > > > timestamp > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > last > > > > > > > > > > > > > > > > record > > > > > > > > > > > > > > > > > (in effect. more accurately it returns same > > as > > > > > > > > > > getTimestamp(), > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > might > > > > > > > > > > > > > > > > > overridden by user). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As a work around, yes, you can provide your > > own > > > > > > > > watermarkFn > > > > > > > > > > > that > > > > > > > > > > > > > > > > > essentially returns Now() or Now()-1sec. > > (usage > > > > in > > > > > > > > javadoc > > > > > > > > > > > > > > > > > <https://github.com/apache/ > > > > > > incubator-beam/blob/master/ > > > > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/ > > > > > > > java/org/apache/beam/sdk/io/ > > > > > > > > > > > > > > > > > kafka/KafkaIO.java#L138> > > > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think default watermark should be > smarter. > > it > > > > > > should > > > > > > > > > > advance > > > > > > > > > > > to > > > > > > > > > > > > > > > current > > > > > > > > > > > > > > > > > time if there aren't any records to read > from > > > > > Kafka. > > > > > > > > Could > > > > > > > > > > you > > > > > > > > > > > > > file a > > > > > > > > > > > > > > > > jira? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > thanks, > > > > > > > > > > > > > > > > > Raghu. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, > > Chawla,Sumit < > > > > > > > > > > > > > > sumitkcha...@gmail.com> > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi All > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am trying to do some simple batch > > > processing > > > > on > > > > > > > > KafkaIO > > > > > > > > > > > > > records. > > > > > > > > > > > > > > > My > > > > > > > > > > > > > > > > > beam > > > > > > > > > > > > > > > > > > pipeline looks like following: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read() > > > > > > > > > > > > > > > > > > .withTopics(ImmutableList.of( > > > > s"mytopic")) > > > > > > > > > > > > > > > > > > .withBootstrapServers(" > > > > localhost:9200") > > > > > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new > > > > > > > > > ExtractKVMessage())) > > > > > > > > > > // > > > > > > > > > > > > > > Emits a > > > > > > > > > > > > > > > > > > KV<String,String> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("WindowBy10Sec", > Window.<KV<String, > > > > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows. > > > > > > > > > of(Duration.standardSeconds( > > > > > > > > > > > > > > > > > > 10))).withAllowedLateness( > > > > > > > Duration.standardSeconds(1))) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create()) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink()) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > My Kafka Source already has some messages > > > > 1000+, > > > > > > and > > > > > > > > new > > > > > > > > > > > > messages > > > > > > > > > > > > > > > > arrive > > > > > > > > > > > > > > > > > > every few minutes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > When i start my pipeline, i can see that > > it > > > > > reads > > > > > > > all > > > > > > > > > the > > > > > > > > > > > > 1000+ > > > > > > > > > > > > > > > > messages > > > > > > > > > > > > > > > > > > from Kafka. However, Window does not > fire > > > > > untill a > > > > > > > new > > > > > > > > > > > message > > > > > > > > > > > > > > > arrives > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > Kafka. And Sink does not receive any > > message > > > > > until > > > > > > > > that > > > > > > > > > > > point. > > > > > > > > > > > > > > Do i > > > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i > > am > > > > not > > > > > > > > > providing > > > > > > > > > > > any > > > > > > > > > > > > > > > > > timeStampFn > > > > > > > > > > > > > > > > > > , i am assuming that timestamps will be > > > > assigned > > > > > as > > > > > > > in > > > > > > > > > when > > > > > > > > > > > > > message > > > > > > > > > > > > > > > > > arrives > > > > > > > > > > > > > > > > > > i.e. ingestion time. What is the default > > > > > > WaterMarkFn > > > > > > > > > > > > > > implementation? > > > > > > > > > > > > > > > > Is > > > > > > > > > > > > > > > > > > the Window not supposed to be fired based > > on > > > > > > > Ingestion > > > > > > > > > > time? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >