Hi,

Would you mind sharing your latency requirements? For example is it < 1 sec
at XX percentile?

With regards to Stateful DoFn with a few exceptions it is supported :
https://beam.apache.org/documentation/runners/capability-matrix/#cap-full-what

Cheers

Reza




On Thu, 30 May 2019 at 18:08, pasquale.bon...@gmail.com <
pasquale.bon...@gmail.com> wrote:

> This was my first option but I'm using google dataflow as runner and it's
> not clear if it supports stateful DoFn.
> However my problem is latency, I've been trying different solution but it
> seems difficult to bring latency under 1s when consuming message (150/s
> )from PubSub with beam/dataflow.
> Is some benchmark or example available to understand if we can effectively
> achieve low latency pr we should look at different solutions?
>
>
>
> On 2019/05/29 17:12:37, Pablo Estrada <pabl...@google.com> wrote:
> > If you add a stateful DoFn to your pipeline, you'll force Beam to shuffle
> > data to their corresponding worker per key. I am not sure what is the
> > latency cost of doing this (as the messages still need to be shuffled).
> But
> > it may help you accomplish this without adding windowing+triggering.
> >
> > -P.
> >
> > On Wed, May 29, 2019 at 5:16 AM pasquale.bon...@gmail.com <
> > pasquale.bon...@gmail.com> wrote:
> >
> > > Hi Reza,
> > > with GlobalWindow with triggering I was able to reduce hotspot issues
> > > gaining satisfying performance for BigTable update. Unfortunately
> latency
> > > when getting messages from PubSub remains around 1.5s that it's too
> much
> > > considering our NFR.
> > >
> > > This is the code I use to get the messages:
> > > PCollectionTuple rawTransactions = p //
> > >                                 .apply("GetMessages",
> > >
> > >
> PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME)
> > >
> > >
> .withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic))
> > >
>  .apply(Window.<PubsubMessage>configure()
> > >                                 .triggering(Repeatedly
> > >
> > > .forever(AfterWatermark.pastEndOfWindow()
> > >                                             .withEarlyFirings(
> > >                                                     AfterProcessingTime
> > >
> > > .pastFirstElementInPane()
> > >
> > > .plusDelayOf(Duration.millis(1)))
> > >                                                             // Fire on
> any
> > > late data
> > >
> > > .withLateFirings(AfterPane.elementCountAtLeast(1))))
> > >                                     .discardingFiredPanes())
> > >
> > > Messages are produced with a different dataflow:
> > >  Pipeline p = Pipeline.create(options);
> > >     p.apply(
> > >             "ReadFile",
> > >             TextIO.read()
> > >                 .from(options.getInputLocation() + "/*.csv")
> > >                 .watchForNewFiles(
> > >                     // Check for new files every 1 seconds
> > >                     Duration.millis(600),
> > >                     // Never stop checking for new files
> > >                     Watch.Growth.<String>never()))
> > >         .apply(
> > >             "create message",
> > >             ParDo.of(
> > >                 new DoFn<String, PubsubMessage>() {
> > >                   @ProcessElement
> > >                   public void processElement(ProcessContext context) {
> > >                     String line = context.element();
> > >
> > >                     String payload = convertRow(line);
> > >                     long now =
> > >
> LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
> > >                     context.output(
> > >                         new PubsubMessage(
> > >                                         payload.getBytes(),
> > > ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME,
> > > payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME,
> > > Long.toString(now))));
> > >                   }
> > >                 }))
> > >         .apply("publish message", PubsubIO.writeMessages().to(topic));
> > >
> > > I'm uploading a file containing 100 rows every 600 ms.
> > >
> > > I found different threads on satckoverflow around this latency issue,
> but
> > > none has a solution.
> > >
> > >
> > >
> > >
> > > On 2019/05/24 07:19:02, Reza Rokni <r...@google.com> wrote:
> > > > PS You can also make use of the GlobalWindow with a stateful DoFn.
> > > >
> > > > On Fri, 24 May 2019 at 15:13, Reza Rokni <r...@google.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Have you explored the use of triggers with your use case?
> > > > >
> > > > >
> > > > >
> > >
> https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html
> > > > >
> > > > > Cheers
> > > > >
> > > > > Reza
> > > > >
> > > > > On Fri, 24 May 2019 at 14:14, pasquale.bon...@gmail.com <
> > > > > pasquale.bon...@gmail.com> wrote:
> > > > >
> > > > >> Hi Reuven,
> > > > >> I would like to know if is possible to guarantee that record are
> > > > >> processed by the same thread/task based on a key, as probably
> happens
> > > in a
> > > > >> combine/stateful operation, without adding the delay of a windows.
> > > > >> This could increase efficiency of caching and reduce same racing
> > > > >> condition when writing data.
> > > > >> I understand that workers are not part of programming model so I
> would
> > > > >> like to know if it's possible to achieve this behaviour reducing
> at
> > > minimum
> > > > >> the delay of windowing. We don't need any combine or state we just
> > > want the
> > > > >> all record with a given key are sent to same thread,
> > > > >>
> > > > >> Thanks
> > > > >>
> > > > >>
> > > > >> On 2019/05/24 03:20:13, Reuven Lax <re...@google.com> wrote:
> > > > >> > Can you explain what you mean by worker? While every runner has
> > > workers
> > > > >> of
> > > > >> > course, workers are not part of the programming model.
> > > > >> >
> > > > >> > On Thu, May 23, 2019 at 8:13 PM pasquale.bon...@gmail.com <
> > > > >> > pasquale.bon...@gmail.com> wrote:
> > > > >> >
> > > > >> > > Hi all,
> > > > >> > > I would like to know if Apache Beam has a functionality
> similar to
> > > > >> > > fieldsGrouping in Storm that allows to send records to a
> specific
> > > > >> > > task/worker based on a key.
> > > > >> > > I know that we can achieve that with a combine/grouByKey
> > > operation but
> > > > >> > > that implies to add a windowing in our pipeline that we don't
> > > want.
> > > > >> > > I have also tried using a stateful transformation.
> > > > >> > > I think that also in that case we should use a windowing, but
> I
> > > see
> > > > >> that a
> > > > >> > > job with a stateful ParDo operation can be submitted  on
> Google
> > > > >> Dataflow
> > > > >> > > with windowing. I don't know if this depends  by lacking of
> > > support
> > > > >> for
> > > > >> > > stateful processing on Dataflow and if I can effetely achieve
> my
> > > goal
> > > > >> with
> > > > >> > > this solution.
> > > > >> > >
> > > > >> > >
> > > > >> > > Thanks in advance for your help
> > > > >> > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > This email may be confidential and privileged. If you received this
> > > > > communication by mistake, please don't forward it to anyone else,
> > > please
> > > > > erase all copies and attachments, and please let me know that it
> has
> > > gone
> > > > > to the wrong person.
> > > > >
> > > > > The above terms reflect a potential business arrangement, are
> provided
> > > > > solely as a basis for further discussion, and are not intended to
> be
> > > and do
> > > > > not constitute a legally binding obligation. No legally binding
> > > obligations
> > > > > will be created, implied, or inferred until an agreement in final
> form
> > > is
> > > > > executed in writing by all parties involved.
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > This email may be confidential and privileged. If you received this
> > > > communication by mistake, please don't forward it to anyone else,
> please
> > > > erase all copies and attachments, and please let me know that it has
> gone
> > > > to the wrong person.
> > > >
> > > > The above terms reflect a potential business arrangement, are
> provided
> > > > solely as a basis for further discussion, and are not intended to be
> and
> > > do
> > > > not constitute a legally binding obligation. No legally binding
> > > obligations
> > > > will be created, implied, or inferred until an agreement in final
> form is
> > > > executed in writing by all parties involved.
> > > >
> > >
> >
>


-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Reply via email to