Sure, wasn't aware of the site github.
Thanks!
John
----- Original message -----
From: Romain Manni-Bucau <rmannibu...@gmail.com>
To: dev@beam.apache.org
Cc:
Subject: Re: DoFn thread constraint.
Date: Thu, Apr 26, 2018 9:18 AM
Do you want to give it a try doing a PR on https://github.com/apache/beam-site? Better to fix an issue by the person getting the issue in general in that area ;)2018-04-26 15:01 GMT+02:00 John MacMillan <john...@ca.ibm.com>:Thank you for your clarification!May I suggest that clarification also make its way into the doc? We had some internal disagreement on reading the current text. :-)John----- Original message -----
From: Romain Manni-Bucau <rmannibu...@gmail.com>
To: dev@beam.apache.org
Cc:
Subject: Re: DoFn thread constraint.
Date: Thu, Apr 26, 2018 8:53 AM
Hi John,The fact a runner caches a fn per thread is an internal implementation detail but a fn will only be activated by one thread max at a time (like stateless or any object pool).This means your fn can fail.2018-04-26 14:37 GMT+02:00 John MacMillan <john...@ca.ibm.com>:I'm trying to understand how restrictive this sentence from 4.3.2 of the Programming Guide is for our runner:Each instance of your function object is accessed by a single thread on a worker instance, unless you explicitly create your own threads.Does this imply that a runner may not ever use a DoFn instance on more than one thread (ie. it is not allowed to move the instance from one thread to another but must create a separate instance for each thread), or only that an instance would only ever be used by a single thread at a time?Or put another way, would the following DoFn (or something like it, if I've messed up) ever be allowed to throw on a properly implemented runner?public class MotionDetector<T> extends DoFn<T,T> {private static transient ThreadLocal<Long> id;}
@ProcessElement
public void processElement(ProcessContext c) {
if (id == null) {
id = new ThreadLocal<Long>() {
@Override
protected Long initialValue() {
return Thread.currentThread().getId();
}
};
} else if (!id.get().equals(Thread.currentThread().getId())) {
throw new RuntimeException("Moved!");
}
c.output(c.element());
}Thanks,John