This was my first option but I'm using google dataflow as runner and it's not 
clear if it supports stateful DoFn.
However my problem is latency, I've been trying different solution but it seems 
difficult to bring latency under 1s when consuming message (150/s  )from PubSub 
with beam/dataflow.
Is some benchmark or example available to understand if we can effectively 
achieve low latency pr we should look at different solutions?



On 2019/05/29 17:12:37, Pablo Estrada <[email protected]> wrote: 
> If you add a stateful DoFn to your pipeline, you'll force Beam to shuffle
> data to their corresponding worker per key. I am not sure what is the
> latency cost of doing this (as the messages still need to be shuffled). But
> it may help you accomplish this without adding windowing+triggering.
> 
> -P.
> 
> On Wed, May 29, 2019 at 5:16 AM [email protected] <
> [email protected]> wrote:
> 
> > Hi Reza,
> > with GlobalWindow with triggering I was able to reduce hotspot issues
> > gaining satisfying performance for BigTable update. Unfortunately latency
> > when getting messages from PubSub remains around 1.5s that it's too much
> > considering our NFR.
> >
> > This is the code I use to get the messages:
> > PCollectionTuple rawTransactions = p //
> >                                 .apply("GetMessages",
> >
> > PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME)
> >
> > .withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic))
> >                                 .apply(Window.<PubsubMessage>configure()
> >                                 .triggering(Repeatedly
> >
> > .forever(AfterWatermark.pastEndOfWindow()
> >                                             .withEarlyFirings(
> >                                                     AfterProcessingTime
> >
> > .pastFirstElementInPane()
> >
> > .plusDelayOf(Duration.millis(1)))
> >                                                             // Fire on any
> > late data
> >
> > .withLateFirings(AfterPane.elementCountAtLeast(1))))
> >                                     .discardingFiredPanes())
> >
> > Messages are produced with a different dataflow:
> >  Pipeline p = Pipeline.create(options);
> >     p.apply(
> >             "ReadFile",
> >             TextIO.read()
> >                 .from(options.getInputLocation() + "/*.csv")
> >                 .watchForNewFiles(
> >                     // Check for new files every 1 seconds
> >                     Duration.millis(600),
> >                     // Never stop checking for new files
> >                     Watch.Growth.<String>never()))
> >         .apply(
> >             "create message",
> >             ParDo.of(
> >                 new DoFn<String, PubsubMessage>() {
> >                   @ProcessElement
> >                   public void processElement(ProcessContext context) {
> >                     String line = context.element();
> >
> >                     String payload = convertRow(line);
> >                     long now =
> > LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
> >                     context.output(
> >                         new PubsubMessage(
> >                                         payload.getBytes(),
> > ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME,
> > payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME,
> > Long.toString(now))));
> >                   }
> >                 }))
> >         .apply("publish message", PubsubIO.writeMessages().to(topic));
> >
> > I'm uploading a file containing 100 rows every 600 ms.
> >
> > I found different threads on satckoverflow around this latency issue, but
> > none has a solution.
> >
> >
> >
> >
> > On 2019/05/24 07:19:02, Reza Rokni <[email protected]> wrote:
> > > PS You can also make use of the GlobalWindow with a stateful DoFn.
> > >
> > > On Fri, 24 May 2019 at 15:13, Reza Rokni <[email protected]> wrote:
> > >
> > > > Hi,
> > > >
> > > > Have you explored the use of triggers with your use case?
> > > >
> > > >
> > > >
> > https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html
> > > >
> > > > Cheers
> > > >
> > > > Reza
> > > >
> > > > On Fri, 24 May 2019 at 14:14, [email protected] <
> > > > [email protected]> wrote:
> > > >
> > > >> Hi Reuven,
> > > >> I would like to know if is possible to guarantee that record are
> > > >> processed by the same thread/task based on a key, as probably happens
> > in a
> > > >> combine/stateful operation, without adding the delay of a windows.
> > > >> This could increase efficiency of caching and reduce same racing
> > > >> condition when writing data.
> > > >> I understand that workers are not part of programming model so I would
> > > >> like to know if it's possible to achieve this behaviour reducing at
> > minimum
> > > >> the delay of windowing. We don't need any combine or state we just
> > want the
> > > >> all record with a given key are sent to same thread,
> > > >>
> > > >> Thanks
> > > >>
> > > >>
> > > >> On 2019/05/24 03:20:13, Reuven Lax <[email protected]> wrote:
> > > >> > Can you explain what you mean by worker? While every runner has
> > workers
> > > >> of
> > > >> > course, workers are not part of the programming model.
> > > >> >
> > > >> > On Thu, May 23, 2019 at 8:13 PM [email protected] <
> > > >> > [email protected]> wrote:
> > > >> >
> > > >> > > Hi all,
> > > >> > > I would like to know if Apache Beam has a functionality similar to
> > > >> > > fieldsGrouping in Storm that allows to send records to a specific
> > > >> > > task/worker based on a key.
> > > >> > > I know that we can achieve that with a combine/grouByKey
> > operation but
> > > >> > > that implies to add a windowing in our pipeline that we don't
> > want.
> > > >> > > I have also tried using a stateful transformation.
> > > >> > > I think that also in that case we should use a windowing, but I
> > see
> > > >> that a
> > > >> > > job with a stateful ParDo operation can be submitted  on Google
> > > >> Dataflow
> > > >> > > with windowing. I don't know if this depends  by lacking of
> > support
> > > >> for
> > > >> > > stateful processing on Dataflow and if I can effetely achieve my
> > goal
> > > >> with
> > > >> > > this solution.
> > > >> > >
> > > >> > >
> > > >> > > Thanks in advance for your help
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > >
> > > > This email may be confidential and privileged. If you received this
> > > > communication by mistake, please don't forward it to anyone else,
> > please
> > > > erase all copies and attachments, and please let me know that it has
> > gone
> > > > to the wrong person.
> > > >
> > > > The above terms reflect a potential business arrangement, are provided
> > > > solely as a basis for further discussion, and are not intended to be
> > and do
> > > > not constitute a legally binding obligation. No legally binding
> > obligations
> > > > will be created, implied, or inferred until an agreement in final form
> > is
> > > > executed in writing by all parties involved.
> > > >
> > >
> > >
> > > --
> > >
> > > This email may be confidential and privileged. If you received this
> > > communication by mistake, please don't forward it to anyone else, please
> > > erase all copies and attachments, and please let me know that it has gone
> > > to the wrong person.
> > >
> > > The above terms reflect a potential business arrangement, are provided
> > > solely as a basis for further discussion, and are not intended to be and
> > do
> > > not constitute a legally binding obligation. No legally binding
> > obligations
> > > will be created, implied, or inferred until an agreement in final form is
> > > executed in writing by all parties involved.
> > >
> >
> 

Reply via email to