Do you have any way of knowing how much of this time is being spent in
Pub/Sub and how much in the Beam pipeline?

If you are using the Dataflow runner and doing any shuffling, 100-150ms is
currently not attainable. Writes to shuffle are batched for up to 100ms at
a time to keep operational costs down, and this parameter is not tunable.

Reuven

On Thu, May 30, 2019 at 5:54 AM [email protected] <
[email protected]> wrote:

> I'm measuring latency as the difference  between the timestamp of the
> column on BigTable and the one I associate to the message when I publish it
> to the topic.
> I also do intermediate measurement after the message is read from PubSub
> topic and before inserting into BigTable.
> All timestamps are written into BigTable and I have a procedure to read
> and analyse  data.
>
> What surprise me more is than with such a smaller load and 4 worker it
> requires 1s for the message to be processed (timestamp taken after the
> message is read from the topic).
>
> We need to lower latency to 100-150 ms.
>
>
>
> On 2019/05/30 11:41:45, Reuven Lax <[email protected]> wrote:
> > How are you measuring latency?
> >
> > On Thu, May 30, 2019 at 3:08 AM [email protected] <
> > [email protected]> wrote:
> >
> > > This was my first option but I'm using google dataflow as runner and
> it's
> > > not clear if it supports stateful DoFn.
> > > However my problem is latency, I've been trying different solution but
> it
> > > seems difficult to bring latency under 1s when consuming message (150/s
> > > )from PubSub with beam/dataflow.
> > > Is some benchmark or example available to understand if we can
> effectively
> > > achieve low latency pr we should look at different solutions?
> > >
> > >
> > >
> > > On 2019/05/29 17:12:37, Pablo Estrada <[email protected]> wrote:
> > > > If you add a stateful DoFn to your pipeline, you'll force Beam to
> shuffle
> > > > data to their corresponding worker per key. I am not sure what is the
> > > > latency cost of doing this (as the messages still need to be
> shuffled).
> > > But
> > > > it may help you accomplish this without adding windowing+triggering.
> > > >
> > > > -P.
> > > >
> > > > On Wed, May 29, 2019 at 5:16 AM [email protected] <
> > > > [email protected]> wrote:
> > > >
> > > > > Hi Reza,
> > > > > with GlobalWindow with triggering I was able to reduce hotspot
> issues
> > > > > gaining satisfying performance for BigTable update. Unfortunately
> > > latency
> > > > > when getting messages from PubSub remains around 1.5s that it's too
> > > much
> > > > > considering our NFR.
> > > > >
> > > > > This is the code I use to get the messages:
> > > > > PCollectionTuple rawTransactions = p //
> > > > >                                 .apply("GetMessages",
> > > > >
> > > > >
> > >
> PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME)
> > > > >
> > > > >
> > >
> .withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic))
> > > > >
> > >  .apply(Window.<PubsubMessage>configure()
> > > > >                                 .triggering(Repeatedly
> > > > >
> > > > > .forever(AfterWatermark.pastEndOfWindow()
> > > > >                                             .withEarlyFirings(
> > > > >
>  AfterProcessingTime
> > > > >
> > > > > .pastFirstElementInPane()
> > > > >
> > > > > .plusDelayOf(Duration.millis(1)))
> > > > >                                                             //
> Fire on
> > > any
> > > > > late data
> > > > >
> > > > > .withLateFirings(AfterPane.elementCountAtLeast(1))))
> > > > >                                     .discardingFiredPanes())
> > > > >
> > > > > Messages are produced with a different dataflow:
> > > > >  Pipeline p = Pipeline.create(options);
> > > > >     p.apply(
> > > > >             "ReadFile",
> > > > >             TextIO.read()
> > > > >                 .from(options.getInputLocation() + "/*.csv")
> > > > >                 .watchForNewFiles(
> > > > >                     // Check for new files every 1 seconds
> > > > >                     Duration.millis(600),
> > > > >                     // Never stop checking for new files
> > > > >                     Watch.Growth.<String>never()))
> > > > >         .apply(
> > > > >             "create message",
> > > > >             ParDo.of(
> > > > >                 new DoFn<String, PubsubMessage>() {
> > > > >                   @ProcessElement
> > > > >                   public void processElement(ProcessContext
> context) {
> > > > >                     String line = context.element();
> > > > >
> > > > >                     String payload = convertRow(line);
> > > > >                     long now =
> > > > >
> > >
> LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
> > > > >                     context.output(
> > > > >                         new PubsubMessage(
> > > > >                                         payload.getBytes(),
> > > > > ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME,
> > > > > payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME,
> > > > > Long.toString(now))));
> > > > >                   }
> > > > >                 }))
> > > > >         .apply("publish message",
> PubsubIO.writeMessages().to(topic));
> > > > >
> > > > > I'm uploading a file containing 100 rows every 600 ms.
> > > > >
> > > > > I found different threads on satckoverflow around this latency
> issue,
> > > but
> > > > > none has a solution.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On 2019/05/24 07:19:02, Reza Rokni <[email protected]> wrote:
> > > > > > PS You can also make use of the GlobalWindow with a stateful
> DoFn.
> > > > > >
> > > > > > On Fri, 24 May 2019 at 15:13, Reza Rokni <[email protected]> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Have you explored the use of triggers with your use case?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > >
> > >
> https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html
> > > > > > >
> > > > > > > Cheers
> > > > > > >
> > > > > > > Reza
> > > > > > >
> > > > > > > On Fri, 24 May 2019 at 14:14, [email protected] <
> > > > > > > [email protected]> wrote:
> > > > > > >
> > > > > > >> Hi Reuven,
> > > > > > >> I would like to know if is possible to guarantee that record
> are
> > > > > > >> processed by the same thread/task based on a key, as probably
> > > happens
> > > > > in a
> > > > > > >> combine/stateful operation, without adding the delay of a
> windows.
> > > > > > >> This could increase efficiency of caching and reduce same
> racing
> > > > > > >> condition when writing data.
> > > > > > >> I understand that workers are not part of programming model
> so I
> > > would
> > > > > > >> like to know if it's possible to achieve this behaviour
> reducing
> > > at
> > > > > minimum
> > > > > > >> the delay of windowing. We don't need any combine or state we
> just
> > > > > want the
> > > > > > >> all record with a given key are sent to same thread,
> > > > > > >>
> > > > > > >> Thanks
> > > > > > >>
> > > > > > >>
> > > > > > >> On 2019/05/24 03:20:13, Reuven Lax <[email protected]> wrote:
> > > > > > >> > Can you explain what you mean by worker? While every runner
> has
> > > > > workers
> > > > > > >> of
> > > > > > >> > course, workers are not part of the programming model.
> > > > > > >> >
> > > > > > >> > On Thu, May 23, 2019 at 8:13 PM [email protected] <
> > > > > > >> > [email protected]> wrote:
> > > > > > >> >
> > > > > > >> > > Hi all,
> > > > > > >> > > I would like to know if Apache Beam has a functionality
> > > similar to
> > > > > > >> > > fieldsGrouping in Storm that allows to send records to a
> > > specific
> > > > > > >> > > task/worker based on a key.
> > > > > > >> > > I know that we can achieve that with a combine/grouByKey
> > > > > operation but
> > > > > > >> > > that implies to add a windowing in our pipeline that we
> don't
> > > > > want.
> > > > > > >> > > I have also tried using a stateful transformation.
> > > > > > >> > > I think that also in that case we should use a windowing,
> but
> > > I
> > > > > see
> > > > > > >> that a
> > > > > > >> > > job with a stateful ParDo operation can be submitted  on
> > > Google
> > > > > > >> Dataflow
> > > > > > >> > > with windowing. I don't know if this depends  by lacking
> of
> > > > > support
> > > > > > >> for
> > > > > > >> > > stateful processing on Dataflow and if I can effetely
> achieve
> > > my
> > > > > goal
> > > > > > >> with
> > > > > > >> > > this solution.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > Thanks in advance for your help
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > This email may be confidential and privileged. If you received
> this
> > > > > > > communication by mistake, please don't forward it to anyone
> else,
> > > > > please
> > > > > > > erase all copies and attachments, and please let me know that
> it
> > > has
> > > > > gone
> > > > > > > to the wrong person.
> > > > > > >
> > > > > > > The above terms reflect a potential business arrangement, are
> > > provided
> > > > > > > solely as a basis for further discussion, and are not intended
> to
> > > be
> > > > > and do
> > > > > > > not constitute a legally binding obligation. No legally binding
> > > > > obligations
> > > > > > > will be created, implied, or inferred until an agreement in
> final
> > > form
> > > > > is
> > > > > > > executed in writing by all parties involved.
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > This email may be confidential and privileged. If you received
> this
> > > > > > communication by mistake, please don't forward it to anyone else,
> > > please
> > > > > > erase all copies and attachments, and please let me know that it
> has
> > > gone
> > > > > > to the wrong person.
> > > > > >
> > > > > > The above terms reflect a potential business arrangement, are
> > > provided
> > > > > > solely as a basis for further discussion, and are not intended
> to be
> > > and
> > > > > do
> > > > > > not constitute a legally binding obligation. No legally binding
> > > > > obligations
> > > > > > will be created, implied, or inferred until an agreement in final
> > > form is
> > > > > > executed in writing by all parties involved.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to