Hi Thomas Sorry i tried with DirectRunner but ran into some kafka issues. Following is the snippet i am working on, and will post more details once i get it working ( as of now i am unable to read messages from Kafka using DirectRunner)
PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(DirectPipelineRunner.class); Pipeline pipeline = Pipeline.create(pipelineOptions); pipeline.apply(KafkaIO.read() .withMaxNumRecords(500) .withTopics(ImmutableList.of("mytopic")) .withBootstrapServers("localhost:9092") .updateConsumerProperties(ImmutableMap.of( ConsumerConfig.GROUP_ID_CONFIG, "test1", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>, KV<String, String>>() { @Override public void processElement(ProcessContext c) throws Exception { KV<byte[], byte[]> record = c.element().getKV(); c.output(KV.of(new String(record.getKey()), new String(record.getValue()))); } })) .apply("WindowByMinute", Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(10))) .withAllowedLateness(Duration.standardSeconds(1)) .triggering( Repeatedly.forever( AfterFirst.of( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(30)), AfterPane.elementCountAtLeast(100) ))) .discardingFiredPanes()) .apply("GroupByTenant", GroupByKey.create()) .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() { @Override public void processElement(ProcessContext c) throws Exception { KV<String, Iterable<String>> element = c.element(); Iterator<String> iterator = element.getValue().iterator(); int count = 0; while (iterator.hasNext()) { iterator.next(); count++; } System.out.println(String.format("Key %s Value Count %d", element.getKey(), count)); } })); pipeline.run(); Regards Sumit Chawla On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh <tg...@google.com.invalid> wrote: > If you use the DirectRunner, do you observe the same behavior? > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <sumitkcha...@gmail.com> > wrote: > > > Hi Thomas > > > > I am using FlinkRunner. Yes the second part of trigger never fires for > me, > > > > Regards > > Sumit Chawla > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh <tg...@google.com.invalid> > > wrote: > > > > > Hey Sumit; > > > > > > What runner are you using? I can set up a test with the same trigger > > > reading from an unbounded input using the DirectRunner and I get the > > > expected output panes. > > > > > > Just to clarify, the second half of the trigger ('when the first > element > > > has been there for at least 30+ seconds') simply never fires? > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <sumitkcha...@gmail.com> > > > wrote: > > > > > > > Hi Thomas > > > > > > > > That did not work. > > > > > > > > I tried following instead: > > > > > > > > .triggering( > > > > Repeatedly.forever( > > > > AfterFirst.of( > > > > AfterProcessingTime. > > > pastFirstElementInPane() > > > > .plusDelayOf(Duration.standard > > > > Seconds(30)), > > > > AfterPane.elementCountAtLeast(100) > > > > ))) > > > > .discardingFiredPanes() > > > > > > > > What i am trying to do here. This is to make sure that followup > > > > operations receive batches of records. > > > > > > > > 1. Fire when at Pane has 100+ elements > > > > > > > > 2. Or Fire when the first element has been there for atleast 30 > sec+. > > > > > > > > However, 2 point does not seem to work. e.g. I have 540 records in > > > > Kafka. The first 500 records are available immediately, > > > > > > > > but the remaining 40 don't pass through. I was expecting 2nd to > > > > trigger to help here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > Sumit Chawla > > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh > <tg...@google.com.invalid > > > > > > > wrote: > > > > > > > > > You can adjust the trigger in the windowing transform if your sink > > can > > > > > handle being written to multiple times for the same window. For > > > example, > > > > if > > > > > the sink appends to the output when it receives new data in a > window, > > > you > > > > > could add something like > > > > > > > > > > Window.into(...).withAllowedLateness(...). > triggering(AfterWatermark. > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime. > > > > > pastFirstElementInPane().withDelayOf(Duration. > standardSeconds(5))). > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin > > > > gFiredPanes(); > > > > > > > > > > This will cause elements to be output some amount of time after > they > > > are > > > > > first received from Kafka, even if Kafka does not have any new > > > elements. > > > > > Elements will only be output by the GroupByKey once. > > > > > > > > > > We should still have a JIRA to improve the KafkaIO watermark > tracking > > > in > > > > > the absence of new records . > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit < > > sumitkcha...@gmail.com > > > > > > > > > wrote: > > > > > > > > > > > Thanks Raghu. > > > > > > > > > > > > I don't have much control over changing KafkaIO properties. I > > added > > > > > > KafkaIO code for completing the example. Are there any changes > > that > > > > can > > > > > be > > > > > > done to Windowing to achieve the same behavior? > > > > > > > > > > > > Regards > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi > > > > <rang...@google.com.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > The default implementation returns processing timestamp of the > > last > > > > > > record > > > > > > > (in effect. more accurately it returns same as getTimestamp(), > > > which > > > > > > might > > > > > > > overridden by user). > > > > > > > > > > > > > > As a work around, yes, you can provide your own watermarkFn > that > > > > > > > essentially returns Now() or Now()-1sec. (usage in javadoc > > > > > > > <https://github.com/apache/incubator-beam/blob/master/ > > > > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/ > > > > > > > kafka/KafkaIO.java#L138> > > > > > > > ) > > > > > > > > > > > > > > I think default watermark should be smarter. it should advance > to > > > > > current > > > > > > > time if there aren't any records to read from Kafka. Could you > > > file a > > > > > > jira? > > > > > > > > > > > > > > thanks, > > > > > > > Raghu. > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit < > > > > sumitkcha...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi All > > > > > > > > > > > > > > > > > > > > > > > > I am trying to do some simple batch processing on KafkaIO > > > records. > > > > > My > > > > > > > beam > > > > > > > > pipeline looks like following: > > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read() > > > > > > > > .withTopics(ImmutableList.of(s"mytopic")) > > > > > > > > .withBootstrapServers("localhost:9200") > > > > > > > > .apply("ExtractMessage", ParDo.of(new ExtractKVMessage())) // > > > > Emits a > > > > > > > > KV<String,String> > > > > > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String, > > > > > > > > JSONObject>>into(FixedWindows.of(Duration.standardSeconds( > > > > > > > > 10))).withAllowedLateness(Duration.standardSeconds(1))) > > > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create()) > > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink()) > > > > > > > > > > > > > > > > > > > > > > > > My Kafka Source already has some messages 1000+, and new > > messages > > > > > > arrive > > > > > > > > every few minutes. > > > > > > > > > > > > > > > > When i start my pipeline, i can see that it reads all the > > 1000+ > > > > > > messages > > > > > > > > from Kafka. However, Window does not fire untill a new > message > > > > > arrives > > > > > > > in > > > > > > > > Kafka. And Sink does not receive any message until that > point. > > > > Do i > > > > > > > need > > > > > > > > to override the WaterMarkFn here? Since i am not providing > any > > > > > > > timeStampFn > > > > > > > > , i am assuming that timestamps will be assigned as in when > > > message > > > > > > > arrives > > > > > > > > i.e. ingestion time. What is the default WaterMarkFn > > > > implementation? > > > > > > Is > > > > > > > > the Window not supposed to be fired based on Ingestion time? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >