Sure, wasn't aware of the site github.
 
Thanks!
 
John
 
----- Original message -----
From: Romain Manni-Bucau <rmannibu...@gmail.com>
To: dev@beam.apache.org
Cc:
Subject: Re: DoFn thread constraint.
Date: Thu, Apr 26, 2018 9:18 AM
 
Do you want to give it a try doing a PR on https://github.com/apache/beam-site? Better to fix an issue by the person getting the issue in general in that area ;)
 

Romain Manni-Bucau
@rmannibucau |  Blog | Old BlogGithub | LinkedIn | Book
 
2018-04-26 15:01 GMT+02:00 John MacMillan <john...@ca.ibm.com>:
Thank you for your clarification!
 
May I suggest that clarification also make its way into the doc? We had some internal disagreement on reading the current text. :-)
 
John
 
----- Original message -----
From: Romain Manni-Bucau <rmannibu...@gmail.com>
To: dev@beam.apache.org
Cc:
Subject: Re: DoFn thread constraint.
Date: Thu, Apr 26, 2018 8:53 AM
 
Hi John,
 
The fact a runner caches a fn per thread is an internal implementation detail but a fn will only be activated by one thread max at a time (like stateless or any object pool).
This means your fn can fail.
 

Romain Manni-Bucau
@rmannibucau |  Blog | Old BlogGithub | LinkedIn | Book
 
2018-04-26 14:37 GMT+02:00 John MacMillan <john...@ca.ibm.com>:
I'm trying to understand how restrictive this sentence from 4.3.2 of the Programming Guide is for our runner:
 
Each instance of your function object is accessed by a single thread on a worker instance, unless you explicitly create your own threads.
 
Does this imply that a runner may not ever use a DoFn instance on more than one thread (ie. it is not allowed to move the instance from one thread to another but must create a separate instance for each thread), or only that an instance would only ever be used by a single thread at a time?
 
Or put another way, would the following DoFn (or something like it, if I've messed up) ever be allowed to throw on a properly implemented runner?
 
public class MotionDetector<T> extends DoFn<T,T> {
    private static transient ThreadLocal<Long> id;
    @ProcessElement
    public void processElement(ProcessContext c) {
        if (id == null) {
            id = new ThreadLocal<Long>() {
                    @Override
                    protected Long initialValue() {
                        return Thread.currentThread().getId();
                    }
                };
        } else if (!id.get().equals(Thread.currentThread().getId())) {
            throw new RuntimeException("Moved!");
        }
        c.output(c.element());
    }
}
 
Thanks,
John
 
 

Reply via email to