The one work around I can suggest, is if its at all possible to parallelize
the work by keying the data. This requires modifying the pipeline. I.e. the
first ParDo produces elements for different keys. Then follow that with a
GBK. Then the downstream pardo will have a thread for every key. IF those
threads all compute something and you need to combine those results in a
single place, you may need to produce elements which again rekey the data,
follow that with another GBK and a combiner.

Something sort of like this, if I understand correctly.

ParDo
GBK
ParDo
GBK
Combiner

But this work around may not work for all problems necessarily. And if the
metrics are designed to be aggregated within a single UDF, ParDo or
Combiner. So if you needed the counters to be aggregated across all of
these operations as well, then this may not work.

The backed in assumption of using the thread local setup is that
parallelism is typically handled by the Beam, rather than introducing a
separate threading model. Though, perhaps breaking out of this threading
model is more common than we initially thought.

I hope thats helpful, sorry we don't have an easy fix.

On Fri, Jan 17, 2020 at 11:39 AM Robert Bradshaw <rober...@google.com>
wrote:

> Yes, this is an issue with how counters are implemented, and there's
> no good workaround. (We could use inheritable thread locals in Java,
> but that assumes the lifetime of the thread does not outlive the
> lifetime of the DoFn, and would probably work poorly with
> threadpools). In the meantime, one can update (say) a Map in the
> spawned threads and let the main thread in processElement (and likely
> finishBundle) increment the metrics in a threadsafe way based on the
> contents of the map.
>
> On Fri, Jan 17, 2020 at 11:29 AM Yixing Zhang <tommykuaid...@gmail.com>
> wrote:
> >
> > Hi Beam Committers,
> >
> > I am a developer on Beam Samza runner. Currently, we are seeing some
> issues where our users failed to update Metrics in their thread. I am
> wondering if anyone has suggestions on this issue.
> >
> > Problem:
> > MetricsContainer is ThreadLocal in MetricsEnvironment. Whenever
> DelegatingCounter.inc() is called. It tries to find the MetricsContainer in
> the current thread and update the corresponding CounterCell. For Samza
> runner, we have a FnWithMetricsWrapper to set the MetricsContainer for the
> current thread before each DoFn is run. However, if users define their own
> threads inside a Pardo function and try to update the Metrics in their
> threads, they will fail to update the Metrics and get error log "Unable to
> update metrics on the current thread....".
> >
> > Example:
> >
> > pipeline
> >     .apply(Create.of(inputData))
> >     .apply(ParDo.of(new DoFn<KV<String, String>, Void>() {
> >   @ProcessElement
> >   public void processElement(ProcessContext context) {
> >     Metrics.counter("test", "counter1").inc();
> >     Thread thread = new Thread(() -> {
> >       Metrics.counter("test", "counter2").inc();
> >     }, "a user-defined thread");
> >     thread.start();
> >   }
> > }));
> >
> > In this case, counter1 can be updated but counter2 cannot be updated
> because MetricsContainer has not been set in their thread.
> >
> > We don't have any control of user-defined threads. So, it seems
> impossible for our runner to set the MetricsContainer for their threads.
> Can someone give me some suggestions either from developer's perspective or
> from user's perspective about how to make this use case work?
> >
> > Thanks,
> > Yixing
> >
>

Reply via email to