In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner with
the DirectRunner (formerly InProcessPipelineRunner), which is capable of
handling Unbounded Pipelines. Is it possible for you to upgrade?

On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <sumitkcha...@gmail.com>
wrote:

> @Ajioscha,  My assumption is here that atleast one trigger should fire.
> Either the 100 elements or the 30 second since first element. (whichever
> happens first)
>
> @Thomas - here is the error i get: I am using 0.1.0-incubating
>
> *ava.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource)*
>
> * at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.
> visitPrimitiveTransform(DirectPipelineRunner.java:890)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:225)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)*
> * a*
>
> Regards
> Sumit Chawla
>
>
> On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Hi,
> > could the reason for the second part of the trigger never firing be that
> > there are never at least 100 elements per key. The trigger would only
> fire
> > if it saw 100 elements and with only 540 elements that seems unlikely if
> > you have more than 6 keys.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tg...@google.com.invalid>
> wrote:
> >
> > > KafkaIO is implemented using the UnboundedRead API, which is supported
> by
> > > the DirectRunner. You should be able to run without the
> > withMaxNumRecords;
> > > if you can't, I'd be very interested to see the stack trace that you
> get
> > > when you try to run the Pipeline.
> > >
> > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <sumitkcha...@gmail.com
> >
> > > wrote:
> > >
> > > > Yes.  I added it only for DirectRunner as it cannot translate
> > > > Read(UnboundedSourceOfKafka)
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> > aljos...@apache.org>
> > > > wrote:
> > > >
> > > > > Ah ok, this might be a stupid question but did you remove this line
> > > when
> > > > > running it with Flink:
> > > > >         .withMaxNumRecords(500)
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <sumitkcha...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Aljoscha
> > > > > >
> > > > > > The code is not different while running on Flink.  It have
> removed
> > > > > business
> > > > > > specific transformations only.
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > > aljos...@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > could you maybe also post the complete that you're using with
> the
> > > > > > > FlinkRunner? I could have a look into it.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> > sumitkcha...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Thomas
> > > > > > > >
> > > > > > > > Sorry i tried with DirectRunner but ran into some kafka
> issues.
> > > > > > > Following
> > > > > > > > is the snippet i am working on, and will post more details
> > once i
> > > > get
> > > > > > it
> > > > > > > > working ( as of now i am unable to read messages from Kafka
> > using
> > > > > > > > DirectRunner)
> > > > > > > >
> > > > > > > >
> > > > > > > > PipelineOptions pipelineOptions =
> > > PipelineOptionsFactory.create();
> > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > >         .withMaxNumRecords(500)
> > > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > "true",
> > > > > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > "earliest"
> > > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> > byte[]>,
> > > > > > > > KV<String, String>>() {
> > > > > > > >     @Override
> > > > > > > >     public void processElement(ProcessContext c) throws
> > > Exception {
> > > > > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > > > String(record.getValue())));
> > > > > > > >     }
> > > > > > > > }))
> > > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > > > > >                 .withAllowedLateness(Duration.
> > standardSeconds(1))
> > > > > > > >                 .triggering(
> > > > > > > >                         Repeatedly.forever(
> > > > > > > >                                 AfterFirst.of(
> > > > > > > >
> > > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > > >
> > > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > > >
> > > > >  AfterPane.elementCountAtLeast(
> > > > > > > 100)
> > > > > > > >                                 )))
> > > > > > > >                 .discardingFiredPanes())
> > > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > > >         .apply(ParDo.of(new DoFn<KV<String,
> Iterable<String>>,
> > > > > Void>()
> > > > > > {
> > > > > > > >             @Override
> > > > > > > >             public void processElement(ProcessContext c)
> throws
> > > > > > > Exception {
> > > > > > > >                 KV<String, Iterable<String>> element =
> > > c.element();
> > > > > > > >                 Iterator<String> iterator =
> > > > > > > element.getValue().iterator();
> > > > > > > >                 int count = 0;
> > > > > > > >                 while (iterator.hasNext()) {
> > > > > > > >                     iterator.next();
> > > > > > > >                     count++;
> > > > > > > >                 }
> > > > > > > >                 System.out.println(String.format("Key %s
> Value
> > > > Count
> > > > > > > > %d", element.getKey(), count));
> > > > > > > >             }
> > > > > > > >         }));
> > > > > > > > pipeline.run();
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Sumit Chawla
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > > <tg...@google.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > If you use the DirectRunner, do you observe the same
> > behavior?
> > > > > > > > >
> > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > > > sumitkcha...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Thomas
> > > > > > > > > >
> > > > > > > > > > I am using FlinkRunner.  Yes the second part of trigger
> > never
> > > > > fires
> > > > > > > for
> > > > > > > > > me,
> > > > > > > > > >
> > > > > > > > > > Regards
> > > > > > > > > > Sumit Chawla
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > > > <tg...@google.com.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Sumit;
> > > > > > > > > > >
> > > > > > > > > > > What runner are you using? I can set up a test with the
> > > same
> > > > > > > trigger
> > > > > > > > > > > reading from an unbounded input using the DirectRunner
> > and
> > > I
> > > > > get
> > > > > > > the
> > > > > > > > > > > expected output panes.
> > > > > > > > > > >
> > > > > > > > > > > Just to clarify, the second half of the trigger ('when
> > the
> > > > > first
> > > > > > > > > element
> > > > > > > > > > > has been there for at least 30+ seconds') simply never
> > > fires?
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > > > sumitkcha...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > >
> > > > > > > > > > > > That did not work.
> > > > > > > > > > > >
> > > > > > > > > > > > I tried following instead:
> > > > > > > > > > > >
> > > > > > > > > > > > .triggering(
> > > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > > >                               AfterProcessingTime.
> > > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > > >
> > > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > > Seconds(30)),
> > > > > > > > > > > >
> > > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > > >                         )))
> > > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > > >
> > > > > > > > > > > > What i am trying to do here.  This is to make sure
> that
> > > > > > followup
> > > > > > > > > > > > operations receive batches of records.
> > > > > > > > > > > >
> > > > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > > > >
> > > > > > > > > > > > 2.  Or Fire when the first element has been there for
> > > > atleast
> > > > > > 30
> > > > > > > > > sec+.
> > > > > > > > > > > >
> > > > > > > > > > > > However,  2 point does not seem to work.  e.g. I have
> > 540
> > > > > > records
> > > > > > > > in
> > > > > > > > > > > > Kafka.  The first 500 records are available
> > immediately,
> > > > > > > > > > > >
> > > > > > > > > > > > but the remaining 40 don't pass through. I was
> > expecting
> > > > 2nd
> > > > > to
> > > > > > > > > > > > trigger to help here.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Regards
> > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > > > <tg...@google.com.invalid
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > You can adjust the trigger in the windowing
> transform
> > > if
> > > > > your
> > > > > > > > sink
> > > > > > > > > > can
> > > > > > > > > > > > > handle being written to multiple times for the same
> > > > window.
> > > > > > For
> > > > > > > > > > > example,
> > > > > > > > > > > > if
> > > > > > > > > > > > > the sink appends to the output when it receives new
> > > data
> > > > > in a
> > > > > > > > > window,
> > > > > > > > > > > you
> > > > > > > > > > > > > could add something like
> > > > > > > > > > > > >
> > > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> > AfterProcessingTime.
> > > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > > withLateFirings(AfterPane.
> elementCountAtLeast(1))).
> > > > > discardin
> > > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > > >
> > > > > > > > > > > > > This will cause elements to be output some amount
> of
> > > time
> > > > > > after
> > > > > > > > > they
> > > > > > > > > > > are
> > > > > > > > > > > > > first received from Kafka, even if Kafka does not
> > have
> > > > any
> > > > > > new
> > > > > > > > > > > elements.
> > > > > > > > > > > > > Elements will only be output by the GroupByKey
> once.
> > > > > > > > > > > > >
> > > > > > > > > > > > > We should still have a JIRA to improve the KafkaIO
> > > > > watermark
> > > > > > > > > tracking
> > > > > > > > > > > in
> > > > > > > > > > > > > the absence of new records .
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > > > > > > sumitkcha...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I don't have much control over changing KafkaIO
> > > > > properties.
> > > > > > > I
> > > > > > > > > > added
> > > > > > > > > > > > > > KafkaIO code for completing the example.  Are
> there
> > > any
> > > > > > > changes
> > > > > > > > > > that
> > > > > > > > > > > > can
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > > > > > > <rang...@google.com.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The default implementation returns processing
> > > > timestamp
> > > > > > of
> > > > > > > > the
> > > > > > > > > > last
> > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > (in effect. more accurately it returns same as
> > > > > > > > getTimestamp(),
> > > > > > > > > > > which
> > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > As a work around, yes, you can provide your own
> > > > > > watermarkFn
> > > > > > > > > that
> > > > > > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage
> > in
> > > > > > javadoc
> > > > > > > > > > > > > > > <https://github.com/apache/
> > > > incubator-beam/blob/master/
> > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I think default watermark should be smarter. it
> > > > should
> > > > > > > > advance
> > > > > > > > > to
> > > > > > > > > > > > > current
> > > > > > > > > > > > > > > time if there aren't any records to read from
> > > Kafka.
> > > > > > Could
> > > > > > > > you
> > > > > > > > > > > file a
> > > > > > > > > > > > > > jira?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > > > > > > sumitkcha...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I am trying to do some simple batch
> processing
> > on
> > > > > > KafkaIO
> > > > > > > > > > > records.
> > > > > > > > > > > > > My
> > > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> > s"mytopic"))
> > > > > > > > > > > > > > > >         .withBootstrapServers("
> > localhost:9200")
> > > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > > > ExtractKVMessage()))
> > > > > > > > //
> > > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > My Kafka Source already has some messages
> > 1000+,
> > > > and
> > > > > > new
> > > > > > > > > > messages
> > > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > When i start my pipeline,  i can see that it
> > > reads
> > > > > all
> > > > > > > the
> > > > > > > > > > 1000+
> > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > from Kafka.  However, Window does not fire
> > > untill a
> > > > > new
> > > > > > > > > message
> > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > Kafka.  And Sink does not receive any message
> > > until
> > > > > > that
> > > > > > > > > point.
> > > > > > > > > > > > Do i
> > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i am
> > not
> > > > > > > providing
> > > > > > > > > any
> > > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > > , i am assuming that timestamps will be
> > assigned
> > > as
> > > > > in
> > > > > > > when
> > > > > > > > > > > message
> > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > i.e. ingestion time.  What is the default
> > > > WaterMarkFn
> > > > > > > > > > > > implementation?
> > > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > the Window not supposed to be fired based on
> > > > > Ingestion
> > > > > > > > time?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to