Hi John, The fact a runner caches a fn per thread is an internal implementation detail but a fn will only be activated by one thread max at a time (like stateless or any object pool). This means your fn can fail.
Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance> 2018-04-26 14:37 GMT+02:00 John MacMillan <[email protected]>: > I'm trying to understand how restrictive this sentence from > 4.3.2 of the Programming Guide > <https://beam.apache.org/documentation/programming-guide/#user-code-thread-compatibility> > is for our runner: > > Each instance of your function object is accessed by a single thread on a > worker instance, unless you explicitly create your own threads. > > Does this imply that a runner may not ever use a DoFn instance on more > than one thread (ie. it is not allowed to move the instance from one thread > to another but must create a separate instance for each thread), or only > that an instance would only ever be used by a single thread at a time? > > Or put another way, would the following DoFn (or something like it, if > I've messed up) ever be allowed to throw on a properly implemented runner? > > public class MotionDetector<T> extends DoFn<T,T> { > private static transient ThreadLocal<Long> id; > @ProcessElement > public void processElement(ProcessContext c) { > if (id == null) { > id = new ThreadLocal<Long>() { > @Override > protected Long initialValue() { > return Thread.currentThread().getId(); > } > }; > } else if (!id.get().equals(Thread.currentThread().getId())) { > throw new RuntimeException("Moved!"); > } > c.output(c.element()); > } > } > > Thanks, > John > >
