[
https://issues.apache.org/jira/browse/APEXCORE-510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425917#comment-15425917
]
Sanjay M Pujare commented on APEXCORE-510:
------------------------------------------
One way to do this as follows:
- In DefaultOutputPort we add a data field 'operatorThread' to save a Thread
object.
- In DefaultOutputPort.emit() we verify that Thread.currentThread() ==
operatorThread else throw an exception. Either make emit() final or split emit
into 2 functions - one final one (having the check) calling the other
overridable method.
- In the engine in Node.setup(), add a call to operator.setupThreadObject()
after operator.setup()
- Operator.setupThreadObject() uses Java reflection to find all data member
objects in the current class of type DefaultOutputPort and sets the
operatorThread object in each of those objects to Thread.currentThread()
> Enforce DefaultOutputPort.emit() or Sink.put() thread affinity
> --------------------------------------------------------------
>
> Key: APEXCORE-510
> URL: https://issues.apache.org/jira/browse/APEXCORE-510
> Project: Apache Apex Core
> Issue Type: Improvement
> Reporter: Vlad Rozov
>
> Apex platform assumes that an operator interacts with the platform using the
> dedicated operator thread. Currently, operators may create worker threads and
> emit tuples from a worker thread. This leads to undefined behavior and hard
> to find bugs, so it should be possible to enforce that
> DefaultOutputPort.emit() and/or Sink.put() are called on the operator thread.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)