If you add a stateful DoFn to your pipeline, you'll force Beam to shuffle
data to their corresponding worker per key. I am not sure what is the
latency cost of doing this (as the messages still need to be shuffled). But
it may help you accomplish this without adding windowing+triggering.

-P.

On Wed, May 29, 2019 at 5:16 AM pasquale.bon...@gmail.com <
pasquale.bon...@gmail.com> wrote:

> Hi Reza,
> with GlobalWindow with triggering I was able to reduce hotspot issues
> gaining satisfying performance for BigTable update. Unfortunately latency
> when getting messages from PubSub remains around 1.5s that it's too much
> considering our NFR.
>
> This is the code I use to get the messages:
> PCollectionTuple rawTransactions = p //
>                                 .apply("GetMessages",
>
> PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME)
>
> .withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic))
>                                 .apply(Window.<PubsubMessage>configure()
>                                 .triggering(Repeatedly
>
> .forever(AfterWatermark.pastEndOfWindow()
>                                             .withEarlyFirings(
>                                                     AfterProcessingTime
>
> .pastFirstElementInPane()
>
> .plusDelayOf(Duration.millis(1)))
>                                                             // Fire on any
> late data
>
> .withLateFirings(AfterPane.elementCountAtLeast(1))))
>                                     .discardingFiredPanes())
>
> Messages are produced with a different dataflow:
>  Pipeline p = Pipeline.create(options);
>     p.apply(
>             "ReadFile",
>             TextIO.read()
>                 .from(options.getInputLocation() + "/*.csv")
>                 .watchForNewFiles(
>                     // Check for new files every 1 seconds
>                     Duration.millis(600),
>                     // Never stop checking for new files
>                     Watch.Growth.<String>never()))
>         .apply(
>             "create message",
>             ParDo.of(
>                 new DoFn<String, PubsubMessage>() {
>                   @ProcessElement
>                   public void processElement(ProcessContext context) {
>                     String line = context.element();
>
>                     String payload = convertRow(line);
>                     long now =
> LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
>                     context.output(
>                         new PubsubMessage(
>                                         payload.getBytes(),
> ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME,
> payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME,
> Long.toString(now))));
>                   }
>                 }))
>         .apply("publish message", PubsubIO.writeMessages().to(topic));
>
> I'm uploading a file containing 100 rows every 600 ms.
>
> I found different threads on satckoverflow around this latency issue, but
> none has a solution.
>
>
>
>
> On 2019/05/24 07:19:02, Reza Rokni <r...@google.com> wrote:
> > PS You can also make use of the GlobalWindow with a stateful DoFn.
> >
> > On Fri, 24 May 2019 at 15:13, Reza Rokni <r...@google.com> wrote:
> >
> > > Hi,
> > >
> > > Have you explored the use of triggers with your use case?
> > >
> > >
> > >
> https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html
> > >
> > > Cheers
> > >
> > > Reza
> > >
> > > On Fri, 24 May 2019 at 14:14, pasquale.bon...@gmail.com <
> > > pasquale.bon...@gmail.com> wrote:
> > >
> > >> Hi Reuven,
> > >> I would like to know if is possible to guarantee that record are
> > >> processed by the same thread/task based on a key, as probably happens
> in a
> > >> combine/stateful operation, without adding the delay of a windows.
> > >> This could increase efficiency of caching and reduce same racing
> > >> condition when writing data.
> > >> I understand that workers are not part of programming model so I would
> > >> like to know if it's possible to achieve this behaviour reducing at
> minimum
> > >> the delay of windowing. We don't need any combine or state we just
> want the
> > >> all record with a given key are sent to same thread,
> > >>
> > >> Thanks
> > >>
> > >>
> > >> On 2019/05/24 03:20:13, Reuven Lax <re...@google.com> wrote:
> > >> > Can you explain what you mean by worker? While every runner has
> workers
> > >> of
> > >> > course, workers are not part of the programming model.
> > >> >
> > >> > On Thu, May 23, 2019 at 8:13 PM pasquale.bon...@gmail.com <
> > >> > pasquale.bon...@gmail.com> wrote:
> > >> >
> > >> > > Hi all,
> > >> > > I would like to know if Apache Beam has a functionality similar to
> > >> > > fieldsGrouping in Storm that allows to send records to a specific
> > >> > > task/worker based on a key.
> > >> > > I know that we can achieve that with a combine/grouByKey
> operation but
> > >> > > that implies to add a windowing in our pipeline that we don't
> want.
> > >> > > I have also tried using a stateful transformation.
> > >> > > I think that also in that case we should use a windowing, but I
> see
> > >> that a
> > >> > > job with a stateful ParDo operation can be submitted  on Google
> > >> Dataflow
> > >> > > with windowing. I don't know if this depends  by lacking of
> support
> > >> for
> > >> > > stateful processing on Dataflow and if I can effetely achieve my
> goal
> > >> with
> > >> > > this solution.
> > >> > >
> > >> > >
> > >> > > Thanks in advance for your help
> > >> > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > > --
> > >
> > > This email may be confidential and privileged. If you received this
> > > communication by mistake, please don't forward it to anyone else,
> please
> > > erase all copies and attachments, and please let me know that it has
> gone
> > > to the wrong person.
> > >
> > > The above terms reflect a potential business arrangement, are provided
> > > solely as a basis for further discussion, and are not intended to be
> and do
> > > not constitute a legally binding obligation. No legally binding
> obligations
> > > will be created, implied, or inferred until an agreement in final form
> is
> > > executed in writing by all parties involved.
> > >
> >
> >
> > --
> >
> > This email may be confidential and privileged. If you received this
> > communication by mistake, please don't forward it to anyone else, please
> > erase all copies and attachments, and please let me know that it has gone
> > to the wrong person.
> >
> > The above terms reflect a potential business arrangement, are provided
> > solely as a basis for further discussion, and are not intended to be and
> do
> > not constitute a legally binding obligation. No legally binding
> obligations
> > will be created, implied, or inferred until an agreement in final form is
> > executed in writing by all parties involved.
> >
>

Reply via email to