Ideally my pipeline requires no shuffling, I just saw that introducing a 
windowing operation improves performance of BigTable insert.
I don't know how to measure time spent in PubSub. I took the time when I 
message is published, fill the timestamp metadata and than confront that value 
with the timestamp of the first DoFn after the message is read.
I can't find any way to measure time spent in Pub/Sub or in 
PubsubIO.readMessagesWithAttributes().
If you can suggest a way I'm happy to run my test measuring that time.
I understand that we can obtain 100-150ms but 1.5s seems too much. Do you have 
an idea of the minimum latency we can achieve?
I also saw that performance improve with time. On 30000 messages I saw that 
average latency improve from 1988ms (first 1000) to 1465 (last 1000) with a 
minimum of 300 ms.

I understand that probably this tool is built for large number, but still I 
would like to verify if we can use a serverless architecture with low load and 
low latency.

On 2019/05/30 15:32:11, Reuven Lax <re...@google.com> wrote: 
> Do you have any way of knowing how much of this time is being spent in
> Pub/Sub and how much in the Beam pipeline?
> 
> If you are using the Dataflow runner and doing any shuffling, 100-150ms is
> currently not attainable. Writes to shuffle are batched for up to 100ms at
> a time to keep operational costs down, and this parameter is not tunable.
> 
> Reuven
> 
> On Thu, May 30, 2019 at 5:54 AM pasquale.bon...@gmail.com <
> pasquale.bon...@gmail.com> wrote:
> 
> > I'm measuring latency as the difference  between the timestamp of the
> > column on BigTable and the one I associate to the message when I publish it
> > to the topic.
> > I also do intermediate measurement after the message is read from PubSub
> > topic and before inserting into BigTable.
> > All timestamps are written into BigTable and I have a procedure to read
> > and analyse  data.
> >
> > What surprise me more is than with such a smaller load and 4 worker it
> > requires 1s for the message to be processed (timestamp taken after the
> > message is read from the topic).
> >
> > We need to lower latency to 100-150 ms.
> >
> >
> >
> > On 2019/05/30 11:41:45, Reuven Lax <re...@google.com> wrote:
> > > How are you measuring latency?
> > >
> > > On Thu, May 30, 2019 at 3:08 AM pasquale.bon...@gmail.com <
> > > pasquale.bon...@gmail.com> wrote:
> > >
> > > > This was my first option but I'm using google dataflow as runner and
> > it's
> > > > not clear if it supports stateful DoFn.
> > > > However my problem is latency, I've been trying different solution but
> > it
> > > > seems difficult to bring latency under 1s when consuming message (150/s
> > > > )from PubSub with beam/dataflow.
> > > > Is some benchmark or example available to understand if we can
> > effectively
> > > > achieve low latency pr we should look at different solutions?
> > > >
> > > >
> > > >
> > > > On 2019/05/29 17:12:37, Pablo Estrada <pabl...@google.com> wrote:
> > > > > If you add a stateful DoFn to your pipeline, you'll force Beam to
> > shuffle
> > > > > data to their corresponding worker per key. I am not sure what is the
> > > > > latency cost of doing this (as the messages still need to be
> > shuffled).
> > > > But
> > > > > it may help you accomplish this without adding windowing+triggering.
> > > > >
> > > > > -P.
> > > > >
> > > > > On Wed, May 29, 2019 at 5:16 AM pasquale.bon...@gmail.com <
> > > > > pasquale.bon...@gmail.com> wrote:
> > > > >
> > > > > > Hi Reza,
> > > > > > with GlobalWindow with triggering I was able to reduce hotspot
> > issues
> > > > > > gaining satisfying performance for BigTable update. Unfortunately
> > > > latency
> > > > > > when getting messages from PubSub remains around 1.5s that it's too
> > > > much
> > > > > > considering our NFR.
> > > > > >
> > > > > > This is the code I use to get the messages:
> > > > > > PCollectionTuple rawTransactions = p //
> > > > > >                                 .apply("GetMessages",
> > > > > >
> > > > > >
> > > >
> > PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME)
> > > > > >
> > > > > >
> > > >
> > .withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic))
> > > > > >
> > > >  .apply(Window.<PubsubMessage>configure()
> > > > > >                                 .triggering(Repeatedly
> > > > > >
> > > > > > .forever(AfterWatermark.pastEndOfWindow()
> > > > > >                                             .withEarlyFirings(
> > > > > >
> >  AfterProcessingTime
> > > > > >
> > > > > > .pastFirstElementInPane()
> > > > > >
> > > > > > .plusDelayOf(Duration.millis(1)))
> > > > > >                                                             //
> > Fire on
> > > > any
> > > > > > late data
> > > > > >
> > > > > > .withLateFirings(AfterPane.elementCountAtLeast(1))))
> > > > > >                                     .discardingFiredPanes())
> > > > > >
> > > > > > Messages are produced with a different dataflow:
> > > > > >  Pipeline p = Pipeline.create(options);
> > > > > >     p.apply(
> > > > > >             "ReadFile",
> > > > > >             TextIO.read()
> > > > > >                 .from(options.getInputLocation() + "/*.csv")
> > > > > >                 .watchForNewFiles(
> > > > > >                     // Check for new files every 1 seconds
> > > > > >                     Duration.millis(600),
> > > > > >                     // Never stop checking for new files
> > > > > >                     Watch.Growth.<String>never()))
> > > > > >         .apply(
> > > > > >             "create message",
> > > > > >             ParDo.of(
> > > > > >                 new DoFn<String, PubsubMessage>() {
> > > > > >                   @ProcessElement
> > > > > >                   public void processElement(ProcessContext
> > context) {
> > > > > >                     String line = context.element();
> > > > > >
> > > > > >                     String payload = convertRow(line);
> > > > > >                     long now =
> > > > > >
> > > >
> > LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
> > > > > >                     context.output(
> > > > > >                         new PubsubMessage(
> > > > > >                                         payload.getBytes(),
> > > > > > ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME,
> > > > > > payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME,
> > > > > > Long.toString(now))));
> > > > > >                   }
> > > > > >                 }))
> > > > > >         .apply("publish message",
> > PubsubIO.writeMessages().to(topic));
> > > > > >
> > > > > > I'm uploading a file containing 100 rows every 600 ms.
> > > > > >
> > > > > > I found different threads on satckoverflow around this latency
> > issue,
> > > > but
> > > > > > none has a solution.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 2019/05/24 07:19:02, Reza Rokni <r...@google.com> wrote:
> > > > > > > PS You can also make use of the GlobalWindow with a stateful
> > DoFn.
> > > > > > >
> > > > > > > On Fri, 24 May 2019 at 15:13, Reza Rokni <r...@google.com> wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Have you explored the use of triggers with your use case?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html
> > > > > > > >
> > > > > > > > Cheers
> > > > > > > >
> > > > > > > > Reza
> > > > > > > >
> > > > > > > > On Fri, 24 May 2019 at 14:14, pasquale.bon...@gmail.com <
> > > > > > > > pasquale.bon...@gmail.com> wrote:
> > > > > > > >
> > > > > > > >> Hi Reuven,
> > > > > > > >> I would like to know if is possible to guarantee that record
> > are
> > > > > > > >> processed by the same thread/task based on a key, as probably
> > > > happens
> > > > > > in a
> > > > > > > >> combine/stateful operation, without adding the delay of a
> > windows.
> > > > > > > >> This could increase efficiency of caching and reduce same
> > racing
> > > > > > > >> condition when writing data.
> > > > > > > >> I understand that workers are not part of programming model
> > so I
> > > > would
> > > > > > > >> like to know if it's possible to achieve this behaviour
> > reducing
> > > > at
> > > > > > minimum
> > > > > > > >> the delay of windowing. We don't need any combine or state we
> > just
> > > > > > want the
> > > > > > > >> all record with a given key are sent to same thread,
> > > > > > > >>
> > > > > > > >> Thanks
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On 2019/05/24 03:20:13, Reuven Lax <re...@google.com> wrote:
> > > > > > > >> > Can you explain what you mean by worker? While every runner
> > has
> > > > > > workers
> > > > > > > >> of
> > > > > > > >> > course, workers are not part of the programming model.
> > > > > > > >> >
> > > > > > > >> > On Thu, May 23, 2019 at 8:13 PM pasquale.bon...@gmail.com <
> > > > > > > >> > pasquale.bon...@gmail.com> wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi all,
> > > > > > > >> > > I would like to know if Apache Beam has a functionality
> > > > similar to
> > > > > > > >> > > fieldsGrouping in Storm that allows to send records to a
> > > > specific
> > > > > > > >> > > task/worker based on a key.
> > > > > > > >> > > I know that we can achieve that with a combine/grouByKey
> > > > > > operation but
> > > > > > > >> > > that implies to add a windowing in our pipeline that we
> > don't
> > > > > > want.
> > > > > > > >> > > I have also tried using a stateful transformation.
> > > > > > > >> > > I think that also in that case we should use a windowing,
> > but
> > > > I
> > > > > > see
> > > > > > > >> that a
> > > > > > > >> > > job with a stateful ParDo operation can be submitted  on
> > > > Google
> > > > > > > >> Dataflow
> > > > > > > >> > > with windowing. I don't know if this depends  by lacking
> > of
> > > > > > support
> > > > > > > >> for
> > > > > > > >> > > stateful processing on Dataflow and if I can effetely
> > achieve
> > > > my
> > > > > > goal
> > > > > > > >> with
> > > > > > > >> > > this solution.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > Thanks in advance for your help
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > This email may be confidential and privileged. If you received
> > this
> > > > > > > > communication by mistake, please don't forward it to anyone
> > else,
> > > > > > please
> > > > > > > > erase all copies and attachments, and please let me know that
> > it
> > > > has
> > > > > > gone
> > > > > > > > to the wrong person.
> > > > > > > >
> > > > > > > > The above terms reflect a potential business arrangement, are
> > > > provided
> > > > > > > > solely as a basis for further discussion, and are not intended
> > to
> > > > be
> > > > > > and do
> > > > > > > > not constitute a legally binding obligation. No legally binding
> > > > > > obligations
> > > > > > > > will be created, implied, or inferred until an agreement in
> > final
> > > > form
> > > > > > is
> > > > > > > > executed in writing by all parties involved.
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > This email may be confidential and privileged. If you received
> > this
> > > > > > > communication by mistake, please don't forward it to anyone else,
> > > > please
> > > > > > > erase all copies and attachments, and please let me know that it
> > has
> > > > gone
> > > > > > > to the wrong person.
> > > > > > >
> > > > > > > The above terms reflect a potential business arrangement, are
> > > > provided
> > > > > > > solely as a basis for further discussion, and are not intended
> > to be
> > > > and
> > > > > > do
> > > > > > > not constitute a legally binding obligation. No legally binding
> > > > > > obligations
> > > > > > > will be created, implied, or inferred until an agreement in final
> > > > form is
> > > > > > > executed in writing by all parties involved.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 

Reply via email to