How are you measuring latency? On Thu, May 30, 2019 at 3:08 AM pasquale.bon...@gmail.com < pasquale.bon...@gmail.com> wrote:
> This was my first option but I'm using google dataflow as runner and it's > not clear if it supports stateful DoFn. > However my problem is latency, I've been trying different solution but it > seems difficult to bring latency under 1s when consuming message (150/s > )from PubSub with beam/dataflow. > Is some benchmark or example available to understand if we can effectively > achieve low latency pr we should look at different solutions? > > > > On 2019/05/29 17:12:37, Pablo Estrada <pabl...@google.com> wrote: > > If you add a stateful DoFn to your pipeline, you'll force Beam to shuffle > > data to their corresponding worker per key. I am not sure what is the > > latency cost of doing this (as the messages still need to be shuffled). > But > > it may help you accomplish this without adding windowing+triggering. > > > > -P. > > > > On Wed, May 29, 2019 at 5:16 AM pasquale.bon...@gmail.com < > > pasquale.bon...@gmail.com> wrote: > > > > > Hi Reza, > > > with GlobalWindow with triggering I was able to reduce hotspot issues > > > gaining satisfying performance for BigTable update. Unfortunately > latency > > > when getting messages from PubSub remains around 1.5s that it's too > much > > > considering our NFR. > > > > > > This is the code I use to get the messages: > > > PCollectionTuple rawTransactions = p // > > > .apply("GetMessages", > > > > > > > PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME) > > > > > > > .withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic)) > > > > .apply(Window.<PubsubMessage>configure() > > > .triggering(Repeatedly > > > > > > .forever(AfterWatermark.pastEndOfWindow() > > > .withEarlyFirings( > > > AfterProcessingTime > > > > > > .pastFirstElementInPane() > > > > > > .plusDelayOf(Duration.millis(1))) > > > // Fire on > any > > > late data > > > > > > .withLateFirings(AfterPane.elementCountAtLeast(1)))) > > > .discardingFiredPanes()) > > > > > > Messages are produced with a different dataflow: > > > Pipeline p = Pipeline.create(options); > > > p.apply( > > > "ReadFile", > > > TextIO.read() > > > .from(options.getInputLocation() + "/*.csv") > > > .watchForNewFiles( > > > // Check for new files every 1 seconds > > > Duration.millis(600), > > > // Never stop checking for new files > > > Watch.Growth.<String>never())) > > > .apply( > > > "create message", > > > ParDo.of( > > > new DoFn<String, PubsubMessage>() { > > > @ProcessElement > > > public void processElement(ProcessContext context) { > > > String line = context.element(); > > > > > > String payload = convertRow(line); > > > long now = > > > > LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); > > > context.output( > > > new PubsubMessage( > > > payload.getBytes(), > > > ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME, > > > payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME, > > > Long.toString(now)))); > > > } > > > })) > > > .apply("publish message", PubsubIO.writeMessages().to(topic)); > > > > > > I'm uploading a file containing 100 rows every 600 ms. > > > > > > I found different threads on satckoverflow around this latency issue, > but > > > none has a solution. > > > > > > > > > > > > > > > On 2019/05/24 07:19:02, Reza Rokni <r...@google.com> wrote: > > > > PS You can also make use of the GlobalWindow with a stateful DoFn. > > > > > > > > On Fri, 24 May 2019 at 15:13, Reza Rokni <r...@google.com> wrote: > > > > > > > > > Hi, > > > > > > > > > > Have you explored the use of triggers with your use case? > > > > > > > > > > > > > > > > > > > https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html > > > > > > > > > > Cheers > > > > > > > > > > Reza > > > > > > > > > > On Fri, 24 May 2019 at 14:14, pasquale.bon...@gmail.com < > > > > > pasquale.bon...@gmail.com> wrote: > > > > > > > > > >> Hi Reuven, > > > > >> I would like to know if is possible to guarantee that record are > > > > >> processed by the same thread/task based on a key, as probably > happens > > > in a > > > > >> combine/stateful operation, without adding the delay of a windows. > > > > >> This could increase efficiency of caching and reduce same racing > > > > >> condition when writing data. > > > > >> I understand that workers are not part of programming model so I > would > > > > >> like to know if it's possible to achieve this behaviour reducing > at > > > minimum > > > > >> the delay of windowing. We don't need any combine or state we just > > > want the > > > > >> all record with a given key are sent to same thread, > > > > >> > > > > >> Thanks > > > > >> > > > > >> > > > > >> On 2019/05/24 03:20:13, Reuven Lax <re...@google.com> wrote: > > > > >> > Can you explain what you mean by worker? While every runner has > > > workers > > > > >> of > > > > >> > course, workers are not part of the programming model. > > > > >> > > > > > >> > On Thu, May 23, 2019 at 8:13 PM pasquale.bon...@gmail.com < > > > > >> > pasquale.bon...@gmail.com> wrote: > > > > >> > > > > > >> > > Hi all, > > > > >> > > I would like to know if Apache Beam has a functionality > similar to > > > > >> > > fieldsGrouping in Storm that allows to send records to a > specific > > > > >> > > task/worker based on a key. > > > > >> > > I know that we can achieve that with a combine/grouByKey > > > operation but > > > > >> > > that implies to add a windowing in our pipeline that we don't > > > want. > > > > >> > > I have also tried using a stateful transformation. > > > > >> > > I think that also in that case we should use a windowing, but > I > > > see > > > > >> that a > > > > >> > > job with a stateful ParDo operation can be submitted on > Google > > > > >> Dataflow > > > > >> > > with windowing. I don't know if this depends by lacking of > > > support > > > > >> for > > > > >> > > stateful processing on Dataflow and if I can effetely achieve > my > > > goal > > > > >> with > > > > >> > > this solution. > > > > >> > > > > > > >> > > > > > > >> > > Thanks in advance for your help > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > -- > > > > > > > > > > This email may be confidential and privileged. If you received this > > > > > communication by mistake, please don't forward it to anyone else, > > > please > > > > > erase all copies and attachments, and please let me know that it > has > > > gone > > > > > to the wrong person. > > > > > > > > > > The above terms reflect a potential business arrangement, are > provided > > > > > solely as a basis for further discussion, and are not intended to > be > > > and do > > > > > not constitute a legally binding obligation. No legally binding > > > obligations > > > > > will be created, implied, or inferred until an agreement in final > form > > > is > > > > > executed in writing by all parties involved. > > > > > > > > > > > > > > > > > -- > > > > > > > > This email may be confidential and privileged. If you received this > > > > communication by mistake, please don't forward it to anyone else, > please > > > > erase all copies and attachments, and please let me know that it has > gone > > > > to the wrong person. > > > > > > > > The above terms reflect a potential business arrangement, are > provided > > > > solely as a basis for further discussion, and are not intended to be > and > > > do > > > > not constitute a legally binding obligation. No legally binding > > > obligations > > > > will be created, implied, or inferred until an agreement in final > form is > > > > executed in writing by all parties involved. > > > > > > > > > >