[ 
https://issues.apache.org/jira/browse/APEXCORE-510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425917#comment-15425917
 ] 

Sanjay M Pujare commented on APEXCORE-510:
------------------------------------------

One way to do this as follows:

- In DefaultOutputPort we add a data field 'operatorThread' to save a Thread 
object.
- In DefaultOutputPort.emit() we verify that Thread.currentThread() == 
operatorThread else throw an exception. Either make emit() final or split emit 
into 2 functions - one final one (having the check) calling the other 
overridable method.
- In the engine in Node.setup(), add a call to operator.setupThreadObject() 
after operator.setup()
- Operator.setupThreadObject() uses Java reflection to find all data member 
objects in the current class of type DefaultOutputPort and sets the 
operatorThread object in each of those objects to Thread.currentThread()

> Enforce DefaultOutputPort.emit() or Sink.put() thread affinity
> --------------------------------------------------------------
>
>                 Key: APEXCORE-510
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-510
>             Project: Apache Apex Core
>          Issue Type: Improvement
>            Reporter: Vlad Rozov
>
> Apex platform assumes that an operator interacts with the platform using the 
> dedicated operator thread. Currently, operators may create worker threads and 
> emit tuples from a worker thread. This leads to undefined behavior and hard 
> to find bugs, so it should be possible to enforce that 
> DefaultOutputPort.emit() and/or Sink.put() are called on the operator thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to