Bryan,

"If you add a processor to the flow and never increase the concurrent tasks
attribute, do you still expect multiple threads to access the processor 
concurrently?"

The answer here is "sort of."

In this case, you'll never have more than 1 thread in the 'onTrigger' method at 
the same time.
However, there is a thread pool that is shared by all Processors, so you may 
have 'Thread 1' execute
onTrigger, than 'Thread 2' execute onTrigger, then 'Thread 3' and then 'Thread 
1' again. So while they
won't be executing concurrently, the Process must still be thread safe, because 
you will have different
threads accessing the same member variables.

Additionally, if you have methods annotated with @OnScheduled, @OnUnscheduled, 
etc., then these
will be executed in a separate thread potentially while onTrigger is being 
executed.

You'll also want to be careful if any of your member variables are mutable 
objects. I.e., marking a List
as volatile won't make things thread-safe.

If it makes sense to only allow you Processor to be run by one thread at a 
time, though, you can add
the @TriggerSerially annotation to the Processor to let the framework know that 
the "Concurrent Tasks"
setting shouldn't even be exposed to the user and should always be 1.

Thanks
-Mark


> Date: Wed, 14 Jan 2015 22:50:06 -0500
> Subject: Re: Stateful Processors
> From: [email protected]
> To: [email protected]
> 
> Bryan,
> 
> "It seems as though any processor member variables that might
> be changed during onTrigger should be volatile, or of an atomic type, to
> ensure the current thread is accessing the proper value."
> 
> "I'm assuming each processor you drag on to the flow is an
> instance (with the unique uuid Joe mentioned) and then the concurrent tasks
> equate to threads that execute tasks for that instance."
> 
> Both of those statements/expectations are absolutely correct.  Sounds like
> you're pretty locked in here.
> 
> Thanks
> Joe
> 
> On Wed, Jan 14, 2015 at 9:46 PM, Bryan Bende <[email protected]> wrote:
> 
> > Thanks Joe G. for reviving this thread, and thanks Joe W. and Adam for the
> > info.  I see what you are talking about with the GetHttp processor, that
> > makes sense to me now. I will take a look at the distributed cache as well.
> >
> > Somewhat related, I think I am also trying to make sure I understand how
> > threads and processors interact, and how to make sure a processor is
> > thread-safe. It seems as though any processor member variables that might
> > be changed during onTrigger should be volatile, or of an atomic type, to
> > ensure the current thread is accessing the proper value.
> >
> > If you add a processor to the flow and never increase the concurrent tasks
> > attribute, do you still expect multiple threads to access the processor
> > concurrently? I'm assuming each processor you drag on to the flow is an
> > instance (with the unique uuid Joe mentioned) and then the concurrent tasks
> > equate to threads that execute tasks for that instance. Just trying to make
> > sure I understand how this is working.
> >
> > Thanks,
> >
> > Bryan
> >
> > On Wed, Jan 14, 2015 at 11:00 AM, Adam Taft <[email protected]> wrote:
> >
> > > Also of note, the distributed cache service is probably the closest to a
> > > cluster-wide framework state management service.  It currently uses our
> > own
> > > persistence backend, but it's conceivable to adapt the distributed cache
> > to
> > > use a database, jndi resource, or a true cache engine, like ehcache.
> > >
> > > Adam
> > >
> > >
> > > On Wed, Jan 14, 2015 at 7:12 AM, Joe Witt <[email protected]> wrote:
> > >
> > > > Joe - thanks for bumping this.
> > > >
> > > > Bryan,
> > > >
> > > > "What are the best practices for implementing a processor that needs to
> > > > maintain some kind of state?
> > > >
> > > > I'm thinking of a processor that executes on a timer and pulls data
> > from
> > > > somewhere, but needs to know where it left off for the next execution,
> > > and
> > > > I was hoping to not involve an external data store here."
> > > >
> > > > The only managed state the framework provides is through the use of
> > Flow
> > > > File objects and the passing of them between processors.  To keep
> > > > persistent accounting for a given processor of some state of what its
> > > doing
> > > > that exists outside of that then you do need to implement some state
> > > > persistence mechanism (to a file, to a database, etc..).
> > > >
> > > > One example of a processor that does this is the GetHttp processor.  It
> > > > interacts with web services and in so doing needs to keep track of any
> > > > cache/E-Tag information it receives so it can be smart about pulling
> > the
> > > > same resource or not depending on whether the server indicates it has
> > > > changed.  How this processor does this is by saving off a file in
> > > > 'conf/.httpCache-<<processor uuid>>'  This use of the processor uuid in
> > > the
> > > > name avoids conflicts with other processors of the same type and makes
> > > > referencing it on startup very easy.  If it is there use it to recover
> > > > state and if not start a new one.
> > > >
> > > > That said it is clearly desirable for the framework to offer some sort
> > of
> > > > managed state mechanism for such simple cases.  We've talked about this
> > > > many times over the years but just never pulled the trigger because
> > there
> > > > was always some aspect of our design ideas we didn't like.  So for
> > right
> > > > now you'll need to implement state persistence like this outside the
> > > > framework.  But I've also kicked off a Jira for doing something about
> > > this
> > > > here: https://issues.apache.org/jira/browse/NIFI-259
> > > >
> > > > What you were seeing in GetKafka and GetJMS processors was management
> > of
> > > > state that involves interaction with their specific resources (Kafka,
> > > > JMS).  In the case of JMS it was a connection pooling type mechanism
> > and
> > > in
> > > > the case of Kafka it was part of Kafkas stream iterator.   That is a
> > > > different thing than this managed persistent state you're asking about.
> > > >
> > > > This is an important topic for us to communicate very well on.  Please
> > > feel
> > > > free to keep firing away until we've answered it fully.
> > > >
> > > > Thanks
> > > > Joe
> > > >
> > > > On Wed, Jan 14, 2015 at 5:06 AM, Joe Gresock <[email protected]>
> > wrote:
> > > >
> > > > > I'm also interested in the answers to Bryan's questions, if anyone
> > has
> > > > some
> > > > > input.
> > > > >
> > > > > Thanks,
> > > > > Joe
> > > > >
> > > > > On Fri, Jan 9, 2015 at 3:50 PM, Bryan Bende <[email protected]>
> > wrote:
> > > > >
> > > > > > What are the best practices for implementing a processor that needs
> > > to
> > > > > > maintain some kind of state?
> > > > > >
> > > > > > I'm thinking of a processor that executes on a timer and pulls data
> > > > from
> > > > > > somewhere, but needs to know where it left off for the next
> > > execution,
> > > > > and
> > > > > > I was hoping to not involve an external data store here.
> > > > > >
> > > > > > From looking at processors like GetJMS and GetKafka, I noticed the
> > > use
> > > > of
> > > > > > BlockingQueue<> where poll() is called at the beginning of
> > > onTrigger(),
> > > > > and
> > > > > > then the object is put back in the queue in a finally block.
> > > > > >
> > > > > > As far as I could tell it looks like the intent was to only have
> > one
> > > > > object
> > > > > > in the queue, and use the queue as the mechanism for synchronizing
> > > > access
> > > > > > to the shared object, so that if another thread called onTrigger it
> > > > would
> > > > > > block on poll() until the previous execution put the object back in
> > > the
> > > > > > queue.
> > > > > >
> > > > > > Is that the general approach?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bryan
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > I know what it is to be in need, and I know what it is to have
> > > plenty.  I
> > > > > have learned the secret of being content in any and every situation,
> > > > > whether well fed or hungry, whether living in plenty or in want.  I
> > can
> > > > do
> > > > > all this through him who gives me strength.    *-Philippians 4:12-13*
> > > > >
> > > >
> > >
> >
                                          

Reply via email to