Hi Reza,
with GlobalWindow with triggering I was able to reduce hotspot issues gaining
satisfying performance for BigTable update. Unfortunately latency when getting
messages from PubSub remains around 1.5s that it's too much considering our NFR.
This is the code I use to get the messages:
PCollectionTuple rawTransactions = p //
.apply("GetMessages",
PubsubIO.readMessagesWithAttributes().withIdAttribute(TRANSACTION_MESSAGE_ID_FIELD_NAME)
.withTimestampAttribute(TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME).fromTopic(topic))
.apply(Window.<PubsubMessage>configure()
.triggering(Repeatedly
.forever(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.millis(1)))
// Fire on any late
data
.withLateFirings(AfterPane.elementCountAtLeast(1))))
.discardingFiredPanes())
Messages are produced with a different dataflow:
Pipeline p = Pipeline.create(options);
p.apply(
"ReadFile",
TextIO.read()
.from(options.getInputLocation() + "/*.csv")
.watchForNewFiles(
// Check for new files every 1 seconds
Duration.millis(600),
// Never stop checking for new files
Watch.Growth.<String>never()))
.apply(
"create message",
ParDo.of(
new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext context) {
String line = context.element();
String payload = convertRow(line);
long now =
LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
context.output(
new PubsubMessage(
payload.getBytes(),
ImmutableMap.of(TRANSACTION_MESSAGE_ID_FIELD_NAME,
payload.split(",")[6],TRANSACTION_MESSAGE_TIMESTAMP_FIELD_NAME,
Long.toString(now))));
}
}))
.apply("publish message", PubsubIO.writeMessages().to(topic));
I'm uploading a file containing 100 rows every 600 ms.
I found different threads on satckoverflow around this latency issue, but none
has a solution.
On 2019/05/24 07:19:02, Reza Rokni <[email protected]> wrote:
> PS You can also make use of the GlobalWindow with a stateful DoFn.
>
> On Fri, 24 May 2019 at 15:13, Reza Rokni <[email protected]> wrote:
>
> > Hi,
> >
> > Have you explored the use of triggers with your use case?
> >
> >
> > https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/transforms/windowing/Trigger.html
> >
> > Cheers
> >
> > Reza
> >
> > On Fri, 24 May 2019 at 14:14, [email protected] <
> > [email protected]> wrote:
> >
> >> Hi Reuven,
> >> I would like to know if is possible to guarantee that record are
> >> processed by the same thread/task based on a key, as probably happens in a
> >> combine/stateful operation, without adding the delay of a windows.
> >> This could increase efficiency of caching and reduce same racing
> >> condition when writing data.
> >> I understand that workers are not part of programming model so I would
> >> like to know if it's possible to achieve this behaviour reducing at minimum
> >> the delay of windowing. We don't need any combine or state we just want the
> >> all record with a given key are sent to same thread,
> >>
> >> Thanks
> >>
> >>
> >> On 2019/05/24 03:20:13, Reuven Lax <[email protected]> wrote:
> >> > Can you explain what you mean by worker? While every runner has workers
> >> of
> >> > course, workers are not part of the programming model.
> >> >
> >> > On Thu, May 23, 2019 at 8:13 PM [email protected] <
> >> > [email protected]> wrote:
> >> >
> >> > > Hi all,
> >> > > I would like to know if Apache Beam has a functionality similar to
> >> > > fieldsGrouping in Storm that allows to send records to a specific
> >> > > task/worker based on a key.
> >> > > I know that we can achieve that with a combine/grouByKey operation but
> >> > > that implies to add a windowing in our pipeline that we don't want.
> >> > > I have also tried using a stateful transformation.
> >> > > I think that also in that case we should use a windowing, but I see
> >> that a
> >> > > job with a stateful ParDo operation can be submitted on Google
> >> Dataflow
> >> > > with windowing. I don't know if this depends by lacking of support
> >> for
> >> > > stateful processing on Dataflow and if I can effetely achieve my goal
> >> with
> >> > > this solution.
> >> > >
> >> > >
> >> > > Thanks in advance for your help
> >> > >
> >> > >
> >> >
> >>
> >
> >
> > --
> >
> > This email may be confidential and privileged. If you received this
> > communication by mistake, please don't forward it to anyone else, please
> > erase all copies and attachments, and please let me know that it has gone
> > to the wrong person.
> >
> > The above terms reflect a potential business arrangement, are provided
> > solely as a basis for further discussion, and are not intended to be and do
> > not constitute a legally binding obligation. No legally binding obligations
> > will be created, implied, or inferred until an agreement in final form is
> > executed in writing by all parties involved.
> >
>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>