How are you measuring latency?

On Thu, May 30, 2019 at 3:08 AM pasquale.bon...@gmail.com <
pasquale.bon...@gmail.com> wrote:

> This was my first option but I'm using google dataflow as runner and it's
> not clear if it supports stateful DoFn.
> However my problem is latency, I've been trying different solution but it
> seems difficult to bring latency under 1s when consuming message (150/s
> )from PubSub with beam/dataflow.
> Is some benchmark or example available to understand if we can effectively
> achieve low latency pr we should look at different solutions?
>
>
>
> On 2019/05/29 17:12:37, Pablo Estrada <pabl...@google.com> wrote:
> > If you add a stateful DoFn to your pipeline, you'll force Beam to shuffle
> > data to their corresponding worker per key. I am not sure what is the
> > latency cost of doing this (as the messages still need to be shuffled).
> But
> > it may help you accomplish this without adding windowing+triggering.
> >
> > -P.
> >
> > On Wed, May 29, 2019 at 5:16 AM pasquale.bon...@gmail.com <
> > pasquale.bon...@gmail.com> wrote:
> >
> > > Hi Reza,
> > > with GlobalWindow with triggering I was able to reduce hotspot issues
> > > gaining satisfying performance for BigTable update. Unfortunately
> latency
> > > when getting messages from PubSub remains around 1.5s that it's too
> much
> > > considering our NFR.
> > >
> > > This is the code I use to get the messages:
> > > PCollectionTuple rawTransactions = p //
> > >                                 .apply("GetMessages",
> > >
> > >
> PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME)
> > >
> > >
> .withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic))
> > >
>  .apply(Window.<PubsubMessage>configure()
> > >                                 .triggering(Repeatedly
> > >
> > > .forever(AfterWatermark.pastEndOfWindow()
> > >                                             .withEarlyFirings(
> > >                                                     AfterProcessingTime
> > >
> > > .pastFirstElementInPane()
> > >
> > > .plusDelayOf(Duration.millis(1)))
> > >                                                             // Fire on
> any
> > > late data
> > >
> > > .withLateFirings(AfterPane.elementCountAtLeast(1))))
> > >                                     .discardingFiredPanes())
> > >
> > > Messages are produced with a different dataflow:
> > >  Pipeline p = Pipeline.create(options);
> > >     p.apply(
> > >             "ReadFile",
> > >             TextIO.read()
> > >                 .from(options.getInputLocation() + "/*.csv")
> > >                 .watchForNewFiles(
> > >                     // Check for new files every 1 seconds
> > >                     Duration.millis(600),
> > >                     // Never stop checking for new files
> > >                     Watch.Growth.<String>never()))
> > >         .apply(
> > >             "create message",
> > >             ParDo.of(
> > >                 new DoFn<String, PubsubMessage>() {
> > >                   @ProcessElement
> > >                   public void processElement(ProcessContext context) {
> > >                     String line = context.element();
> > >
> > >                     String payload = convertRow(line);
> > >                     long now =
> > >
> LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
> > >                     context.output(
> > >                         new PubsubMessage(
> > >                                         payload.getBytes(),
> > > ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME,
> > > payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME,
> > > Long.toString(now))));
> > >                   }
> > >                 }))
> > >         .apply("publish message", PubsubIO.writeMessages().to(topic));
> > >
> > > I'm uploading a file containing 100 rows every 600 ms.
> > >
> > > I found different threads on satckoverflow around this latency issue,
> but
> > > none has a solution.
> > >
> > >
> > >
> > >
> > > On 2019/05/24 07:19:02, Reza Rokni <r...@google.com> wrote:
> > > > PS You can also make use of the GlobalWindow with a stateful DoFn.
> > > >
> > > > On Fri, 24 May 2019 at 15:13, Reza Rokni <r...@google.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Have you explored the use of triggers with your use case?
> > > > >
> > > > >
> > > > >
> > >
> https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html
> > > > >
> > > > > Cheers
> > > > >
> > > > > Reza
> > > > >
> > > > > On Fri, 24 May 2019 at 14:14, pasquale.bon...@gmail.com <
> > > > > pasquale.bon...@gmail.com> wrote:
> > > > >
> > > > >> Hi Reuven,
> > > > >> I would like to know if is possible to guarantee that record are
> > > > >> processed by the same thread/task based on a key, as probably
> happens
> > > in a
> > > > >> combine/stateful operation, without adding the delay of a windows.
> > > > >> This could increase efficiency of caching and reduce same racing
> > > > >> condition when writing data.
> > > > >> I understand that workers are not part of programming model so I
> would
> > > > >> like to know if it's possible to achieve this behaviour reducing
> at
> > > minimum
> > > > >> the delay of windowing. We don't need any combine or state we just
> > > want the
> > > > >> all record with a given key are sent to same thread,
> > > > >>
> > > > >> Thanks
> > > > >>
> > > > >>
> > > > >> On 2019/05/24 03:20:13, Reuven Lax <re...@google.com> wrote:
> > > > >> > Can you explain what you mean by worker? While every runner has
> > > workers
> > > > >> of
> > > > >> > course, workers are not part of the programming model.
> > > > >> >
> > > > >> > On Thu, May 23, 2019 at 8:13 PM pasquale.bon...@gmail.com <
> > > > >> > pasquale.bon...@gmail.com> wrote:
> > > > >> >
> > > > >> > > Hi all,
> > > > >> > > I would like to know if Apache Beam has a functionality
> similar to
> > > > >> > > fieldsGrouping in Storm that allows to send records to a
> specific
> > > > >> > > task/worker based on a key.
> > > > >> > > I know that we can achieve that with a combine/grouByKey
> > > operation but
> > > > >> > > that implies to add a windowing in our pipeline that we don't
> > > want.
> > > > >> > > I have also tried using a stateful transformation.
> > > > >> > > I think that also in that case we should use a windowing, but
> I
> > > see
> > > > >> that a
> > > > >> > > job with a stateful ParDo operation can be submitted  on
> Google
> > > > >> Dataflow
> > > > >> > > with windowing. I don't know if this depends  by lacking of
> > > support
> > > > >> for
> > > > >> > > stateful processing on Dataflow and if I can effetely achieve
> my
> > > goal
> > > > >> with
> > > > >> > > this solution.
> > > > >> > >
> > > > >> > >
> > > > >> > > Thanks in advance for your help
> > > > >> > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > This email may be confidential and privileged. If you received this
> > > > > communication by mistake, please don't forward it to anyone else,
> > > please
> > > > > erase all copies and attachments, and please let me know that it
> has
> > > gone
> > > > > to the wrong person.
> > > > >
> > > > > The above terms reflect a potential business arrangement, are
> provided
> > > > > solely as a basis for further discussion, and are not intended to
> be
> > > and do
> > > > > not constitute a legally binding obligation. No legally binding
> > > obligations
> > > > > will be created, implied, or inferred until an agreement in final
> form
> > > is
> > > > > executed in writing by all parties involved.
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > This email may be confidential and privileged. If you received this
> > > > communication by mistake, please don't forward it to anyone else,
> please
> > > > erase all copies and attachments, and please let me know that it has
> gone
> > > > to the wrong person.
> > > >
> > > > The above terms reflect a potential business arrangement, are
> provided
> > > > solely as a basis for further discussion, and are not intended to be
> and
> > > do
> > > > not constitute a legally binding obligation. No legally binding
> > > obligations
> > > > will be created, implied, or inferred until an agreement in final
> form is
> > > > executed in writing by all parties involved.
> > > >
> > >
> >
>

Reply via email to