-Removed dev@

Operators can implement idle Time Handler.
https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.IdleTimeHandler.html

On Tue, Feb 21, 2017 at 11:33 AM Sunil Parmar <[email protected]>
wrote:

> Ram,
> Thanks for the prompt response. If we use the approach you suggested we’re
> dependent on main thread’s process call I.e. Tuples in the thread safe
> queue gets only processed when main thread is processing incoming tuples.
> How can we explicitly call the process from polling of delay queue ?
>
> Just for reference here’s the sample code snippet for our operator.
>
> public class MyOperator extends BaseOperator implements
>
>         Operator.ActivationListener<Context.OperatorContext> {
> …..
>
> @InputPortFieldAnnotation
>
>     public transient DefaultInputPort<String> kafkaStreamInput =
>
>             new DefaultInputPort<String>() {
>
>         List<String> errors = new ArrayList<String>();
>
>         @Override
>
>         public void process(String consumerRecord) {
>
> //Code for normal tuple process
>
> //Code to poll thread safe queue
>
> }
>
> *————————————**—————————————————————————*
> *From: *Munagala Ramanath <[email protected]>
> *To: *[email protected]
> *CC: *"[email protected]" <[email protected]>, Allan De Leon <
> [email protected]>, Tim Zhu <[email protected]>
> *Subject: *Re: Occasional Out of order tuples when emitting from a thread
> *Date: *2017-02-21 10:08 (-0800)
> *List: *[email protected]
> <https://lists.apache.org/[email protected]>
>
> Please note that tuples should not be emitted by any thread other than the
> main operator thread.
>
> A common pattern is to use a thread-safe queue and have worker threads
> enqueue
> tuples there; the main operator thread then pulls tuples from the queue and
> emits them.
>
> Ram
>
> _______________________________________________________
>
> Munagala V. Ramanath
>
> Software Engineer
>
> E: [email protected] | M: (408) 331-5034 | Twitter: @UnknownRam
> www.datatorrent.com  |  apex.apache.org
>
>
> From: Sunil Parmar <[email protected]>
> Date: Tuesday, February 21, 2017 at 10:05 AM
> To: "[email protected]" <[email protected]>, "[email protected]"
> <[email protected]>
> Cc: Allan De Leon <[email protected]>, Tim Zhu <
> [email protected]>
> Subject: Occasional Out of order tuples when emitting from a thread
>
> Hi there,
> We have the following setup:
>
>    - we have a generic operator that’s processing tuples in its input port
>    - in the input port’s process method, we check for a condition, and:
>       - if the condition is met, the tuple is emitted to the next
>       operator right away (in the process method)
>       - Otherwise, if the condition is not met, we store the tuple  in
>       some cache and we use some threads that periodically check the 
> condition to
>       become true. Once the condition is true, the threads call the emit 
> method
>       on the stored tuples.
>
> With this setup, we occasionally encounter the following error:
> 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode:
> Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on
> port transformedJSON while expecting 58a4046100003b7e
>
> Is there a way to make the above work correctly?
> If not, can you recommend a better way of doing this?
> How can we ensure window assignment is done synchronously before emitting
> tuples ?
>
> Thanks very much in advance…
> -allan
>
-- 
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

Reply via email to