If you add a stateful DoFn to your pipeline, you'll force Beam to shuffle data to their corresponding worker per key. I am not sure what is the latency cost of doing this (as the messages still need to be shuffled). But it may help you accomplish this without adding windowing+triggering.
-P. On Wed, May 29, 2019 at 5:16 AM pasquale.bon...@gmail.com < pasquale.bon...@gmail.com> wrote: > Hi Reza, > with GlobalWindow with triggering I was able to reduce hotspot issues > gaining satisfying performance for BigTable update. Unfortunately latency > when getting messages from PubSub remains around 1.5s that it's too much > considering our NFR. > > This is the code I use to get the messages: > PCollectionTuple rawTransactions = p // > .apply("GetMessages", > > PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME) > > .withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic)) > .apply(Window.<PubsubMessage>configure() > .triggering(Repeatedly > > .forever(AfterWatermark.pastEndOfWindow() > .withEarlyFirings( > AfterProcessingTime > > .pastFirstElementInPane() > > .plusDelayOf(Duration.millis(1))) > // Fire on any > late data > > .withLateFirings(AfterPane.elementCountAtLeast(1)))) > .discardingFiredPanes()) > > Messages are produced with a different dataflow: > Pipeline p = Pipeline.create(options); > p.apply( > "ReadFile", > TextIO.read() > .from(options.getInputLocation() + "/*.csv") > .watchForNewFiles( > // Check for new files every 1 seconds > Duration.millis(600), > // Never stop checking for new files > Watch.Growth.<String>never())) > .apply( > "create message", > ParDo.of( > new DoFn<String, PubsubMessage>() { > @ProcessElement > public void processElement(ProcessContext context) { > String line = context.element(); > > String payload = convertRow(line); > long now = > LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); > context.output( > new PubsubMessage( > payload.getBytes(), > ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME, > payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME, > Long.toString(now)))); > } > })) > .apply("publish message", PubsubIO.writeMessages().to(topic)); > > I'm uploading a file containing 100 rows every 600 ms. > > I found different threads on satckoverflow around this latency issue, but > none has a solution. > > > > > On 2019/05/24 07:19:02, Reza Rokni <r...@google.com> wrote: > > PS You can also make use of the GlobalWindow with a stateful DoFn. > > > > On Fri, 24 May 2019 at 15:13, Reza Rokni <r...@google.com> wrote: > > > > > Hi, > > > > > > Have you explored the use of triggers with your use case? > > > > > > > > > > https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html > > > > > > Cheers > > > > > > Reza > > > > > > On Fri, 24 May 2019 at 14:14, pasquale.bon...@gmail.com < > > > pasquale.bon...@gmail.com> wrote: > > > > > >> Hi Reuven, > > >> I would like to know if is possible to guarantee that record are > > >> processed by the same thread/task based on a key, as probably happens > in a > > >> combine/stateful operation, without adding the delay of a windows. > > >> This could increase efficiency of caching and reduce same racing > > >> condition when writing data. > > >> I understand that workers are not part of programming model so I would > > >> like to know if it's possible to achieve this behaviour reducing at > minimum > > >> the delay of windowing. We don't need any combine or state we just > want the > > >> all record with a given key are sent to same thread, > > >> > > >> Thanks > > >> > > >> > > >> On 2019/05/24 03:20:13, Reuven Lax <re...@google.com> wrote: > > >> > Can you explain what you mean by worker? While every runner has > workers > > >> of > > >> > course, workers are not part of the programming model. > > >> > > > >> > On Thu, May 23, 2019 at 8:13 PM pasquale.bon...@gmail.com < > > >> > pasquale.bon...@gmail.com> wrote: > > >> > > > >> > > Hi all, > > >> > > I would like to know if Apache Beam has a functionality similar to > > >> > > fieldsGrouping in Storm that allows to send records to a specific > > >> > > task/worker based on a key. > > >> > > I know that we can achieve that with a combine/grouByKey > operation but > > >> > > that implies to add a windowing in our pipeline that we don't > want. > > >> > > I have also tried using a stateful transformation. > > >> > > I think that also in that case we should use a windowing, but I > see > > >> that a > > >> > > job with a stateful ParDo operation can be submitted on Google > > >> Dataflow > > >> > > with windowing. I don't know if this depends by lacking of > support > > >> for > > >> > > stateful processing on Dataflow and if I can effetely achieve my > goal > > >> with > > >> > > this solution. > > >> > > > > >> > > > > >> > > Thanks in advance for your help > > >> > > > > >> > > > > >> > > > >> > > > > > > > > > -- > > > > > > This email may be confidential and privileged. If you received this > > > communication by mistake, please don't forward it to anyone else, > please > > > erase all copies and attachments, and please let me know that it has > gone > > > to the wrong person. > > > > > > The above terms reflect a potential business arrangement, are provided > > > solely as a basis for further discussion, and are not intended to be > and do > > > not constitute a legally binding obligation. No legally binding > obligations > > > will be created, implied, or inferred until an agreement in final form > is > > > executed in writing by all parties involved. > > > > > > > > > -- > > > > This email may be confidential and privileged. If you received this > > communication by mistake, please don't forward it to anyone else, please > > erase all copies and attachments, and please let me know that it has gone > > to the wrong person. > > > > The above terms reflect a potential business arrangement, are provided > > solely as a basis for further discussion, and are not intended to be and > do > > not constitute a legally binding obligation. No legally binding > obligations > > will be created, implied, or inferred until an agreement in final form is > > executed in writing by all parties involved. > > >