-Removed dev@ Operators can implement idle Time Handler. https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.IdleTimeHandler.html
On Tue, Feb 21, 2017 at 11:33 AM Sunil Parmar <[email protected]> wrote: > Ram, > Thanks for the prompt response. If we use the approach you suggested we’re > dependent on main thread’s process call I.e. Tuples in the thread safe > queue gets only processed when main thread is processing incoming tuples. > How can we explicitly call the process from polling of delay queue ? > > Just for reference here’s the sample code snippet for our operator. > > public class MyOperator extends BaseOperator implements > > Operator.ActivationListener<Context.OperatorContext> { > ….. > > @InputPortFieldAnnotation > > public transient DefaultInputPort<String> kafkaStreamInput = > > new DefaultInputPort<String>() { > > List<String> errors = new ArrayList<String>(); > > @Override > > public void process(String consumerRecord) { > > //Code for normal tuple process > > //Code to poll thread safe queue > > } > > *————————————**—————————————————————————* > *From: *Munagala Ramanath <[email protected]> > *To: *[email protected] > *CC: *"[email protected]" <[email protected]>, Allan De Leon < > [email protected]>, Tim Zhu <[email protected]> > *Subject: *Re: Occasional Out of order tuples when emitting from a thread > *Date: *2017-02-21 10:08 (-0800) > *List: *[email protected] > <https://lists.apache.org/[email protected]> > > Please note that tuples should not be emitted by any thread other than the > main operator thread. > > A common pattern is to use a thread-safe queue and have worker threads > enqueue > tuples there; the main operator thread then pulls tuples from the queue and > emits them. > > Ram > > _______________________________________________________ > > Munagala V. Ramanath > > Software Engineer > > E: [email protected] | M: (408) 331-5034 | Twitter: @UnknownRam > www.datatorrent.com | apex.apache.org > > > From: Sunil Parmar <[email protected]> > Date: Tuesday, February 21, 2017 at 10:05 AM > To: "[email protected]" <[email protected]>, "[email protected]" > <[email protected]> > Cc: Allan De Leon <[email protected]>, Tim Zhu < > [email protected]> > Subject: Occasional Out of order tuples when emitting from a thread > > Hi there, > We have the following setup: > > - we have a generic operator that’s processing tuples in its input port > - in the input port’s process method, we check for a condition, and: > - if the condition is met, the tuple is emitted to the next > operator right away (in the process method) > - Otherwise, if the condition is not met, we store the tuple in > some cache and we use some threads that periodically check the > condition to > become true. Once the condition is true, the threads call the emit > method > on the stored tuples. > > With this setup, we occasionally encounter the following error: > 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode: > Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on > port transformedJSON while expecting 58a4046100003b7e > > Is there a way to make the above work correctly? > If not, can you recommend a better way of doing this? > How can we ensure window assignment is done synchronously before emitting > tuples ? > > Thanks very much in advance… > -allan > -- *Join us at Apex Big Data World-San Jose <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* [image: http://www.apexbigdata.com/san-jose-register.html] <http://www.apexbigdata.com/san-jose-register.html>
