Hello Pramod,

Thanks for the comments. I adjusted the title of the JIRA. Here is what I was 
thinking for the worker pool implementation.

- The main reason ( which I forgot to mention in the design points below ) is 
that the java embedded engine allows only the thread that created the instance 
to execute the python logic. This is more because of the JNI specification 
itself. Some hints here 
https://stackoverflow.com/questions/18056347/jni-calling-java-from-c-with-multiple-threads
 
<https://stackoverflow.com/questions/18056347/jni-calling-java-from-c-with-multiple-threads>
 and here 
http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/implementing/sync.html 
<http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/implementing/sync.html>

- This essentially means that the main operator thread will have to call the 
python code execution logic if the design were otherwise.

- Since the end user can choose to can write any kind of logic including 
blocking I/O as part of the implementation, I did not want to stall the 
operator thread for any usage pattern. 

- In fact there is only one overall interpreter in the JVM process space and 
the interpreter thread is just a JNI wrapper around it to account for the JNI 
limitations above.

- It is for the very same reason, there is an API in the implementation to 
support for registering Shared Modules across all of the interpreter threads. 
Use cases for this exist when there is a global variable provided by the 
underlying Python library and loading it multiple times can cause issues. Hence 
the API to register a shared module which can be used by all of the Interpreter 
Threads. 

- The operator submits to a work request queue and consumes from a response 
queue for each of the interpreter thread. There exists one request and one 
response queue per interpreter thread.

- The stragglers will get drained from the response queue for a previously 
submitted request queue. 

- The other reason why I chose to implement it this ways is also for some of 
the use case that I foresee in the ML scoring scenarios. In fraud systems, if I 
have a strict SLA to score a model, the main thread in the operator is not 
helping me implement this pattern at all. The caller to the Apex application 
will need to proceed if the scoring gets delayed due to whatever reason. 
However the scoring can continue on the interpreter thread and can be drained 
later ( It is just that the caller did not make use of this result but can 
still be persisted for operators consuming from the straggler port. 

- There are 3 output ports for this operator. DefaultOutputPort, stragglersPort 
and an errorPort. 

- Some libraries like Tensorflow can become really heavy. Tensorflow models can 
execute a tensorflow DAG as part of a model scoring implementation and hence I 
wanted to take the approach of a worker pool. Yes your point is valid if we 
wait for the stragglers to complete in a given window. The current 
implementation does not force to wait for all of the stragglers to complete. 
The stragglers are emitted only when there is a new tuple that is being 
processed. i.e. when a new tuple arrives for scoring , the straggler response 
queue is checked if there are any entries and if yes, the responses are emitted 
into the stragglerPort. This essentially means that there are situations when 
the straggler port is emitting the result for a request submitted in the 
previous window. This also implies that idempotency cannot be guaranteed across 
runs of the same input data. In fact all threaded implementations have this 
issue as ordering of the results is not guaranteed to be unique even within a 
given window ?

I can enforce a block/drain at the end of the window to force a completion 
basing on the feedback. 


Regards,
Ananth

> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pra...@datatorrent.com> wrote:
> 
> Hi Anath,
> 
> Sounds interesting and looks like you have put quite a bit of work on it.
> Might I suggest changing the title of 2260 to better fit your proposal and
> implementation, mainly so that there is differentiation from 2261.
> 
> I wanted to discuss the proposal to use multiple threads in an operator
> instance. Unless the execution threads are blocking for some sort of i/o
> why would it result in a noticeable performance difference compared to
> processing in operator thread and running multiple partitions of the
> operator in container local. By running the processing in a separate thread
> from the operator lifecycle thread you don't still get away from matching
> the incoming data throughput. The checkpoint will act as a time where you
> backpressure will start to materialize when the operator would have to wait
> for your background processing to complete to guarantee all data till the
> checkpoint is processed.
> 
> Thanks
> 
> 
> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.a...@gmail.com> wrote:
> 
>> Hello All,
>> 
>> I would like to submit the design for the Python execution operator before
>> I raise the pull request so that I can refine the implementation based on
>> feedback. Could you please provide feedback on the design if any and I will
>> raise the PR accordingly.
>> 
>> - This operator is for the JIRA ticket raised here
>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
>> - The operator embeds a python interpreter in the operator JVM process
>> space and is not external to the JVM.
>> - The implementation is proposing the use of Java Embedded Python ( JEP )
>> given here https://github.com/ninia/jep <https://github.com/ninia/jep>
>> - The JEP engine is under zlib/libpng license. Since this is an approved
>> license under https://www.apache.org/legal/resolved.html#category-a <
>> https://www.apache.org/legal/resolved.html#category-a> I am assuming it
>> is ok for the community to approve the inclusion of this library
>> - Python integration is a messy piece due to the nature of dynamic
>> libraries. All python libraries need to be natively installed. This also
>> means we will not be able bundle python libraries and dependencies as part
>> of the build into the target JVM container. Hence this operator has the
>> current limitation of the python binaries installed through an external
>> process on all of the YARN nodes for now.
>> - The JEP maven dependency jar in the POM is a JNI wrapper around the
>> dynamic library that is installed externally to the Apex installation
>> process on all of the YARN nodes.
>> - Hope to take up https://issues.apache.org/jira/browse/APEXCORE-796 <
>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this issue
>> in the future.
>> - The python operator implementation can be extended to py4J based
>> implementation ( as opposed to in-memory model like JEP ) in the future if
>> required be. JEP is the implementation based on an in-memory design pattern.
>> - The python operator allows for 4 major API patterns
>>    - Execute a method call by accepting parameters to pass to the
>> interpreter
>>    - Execute a python script as given in a file path
>>    - Evaluate an expression and allows for passing of variables between
>> the java code and the python in-memory interpreter bridge
>>    - A handy method wherein a series of instructions can be passed in one
>> single java call ( executed as a sequence of python eval instructions under
>> the hood )
>> - Automatic garbage collection of the variables that are passed from java
>> code to the in memory python interpreter
>> - Support for all major python libraries. Tensorflow, Keras, Scikit,
>> xgboost. Preliminary tests for these libraries seem to work as per code
>> here : https://github.com/ananthc/sampleapps/tree/master/apache-
>> apex/apexjvmpython <https://github.com/ananthc/
>> sampleapps/tree/master/apache-apex/apexjvmpython>
>> - The implementation allows for SLA based execution model. i.e. the
>> operator is given a chance to execute the python code and if not complete
>> within a time out, the operator code returns back null.
>> - A tuple that has become a straggler as per previous point will
>> automatically be drained off to a different port so that downstream
>> operators can still consume the straggler if they want to when the results
>> arrive.
>> - Because of the nature of python being an interpreter and if a previous
>> tuple is being still processed, there is chance of a back pressure pattern
>> building up very quickly. Hence this operator works on the concept of a
>> worker pool. The Python operator uses a configurable number of worker
>> thread each of which embed the Python interpreter within their processing
>> space. i.e. it is in fact a collection of python ink memory interpreters
>> inside the Python operator implementation.
>> - The operator chooses one of the threads at runtime basing on their busy
>> state thus allowing for back-pressure issues to be resolved automatically.
>> - There is a first class support for Numpy in JEP. Java arrays would be
>> convertible to the Python Numpy arrays and vice versa and share the same
>> memory addresses for efficiency reasons.
>> - The base operator implements dynamic partitioning based on a thread
>> starvation policy. At each checkpoint, it checks how much percentage of the
>> requests resulted in starved threads and if the starvation exceeds a
>> configured percentage, a new instance of the operator is provisioned for
>> every such instance of the operator
>> - The operator provides the notion of a worker execution mode. There are
>> two worker modes that are passed in each of the above calls from the user.
>> ALL or ANY.  Because python interpreter is state based engine, a newly
>> dynamically partitioned operator might not be in the exact state of the
>> remaining operators. Hence the operator has this notion of worker execution
>> mode. Any call ( any of the 4 calls mentioned above ) called with ALL
>> execution mode will be executed on all the workers of the worker thread
>> pool as well as the dynamically portioned instance whenever such an
>> instance is provisioned.
>> - The base operator implementation has a method that can be overridden to
>> implement the logic that needs to be executed for each tuple. The base
>> operator default implementation is a simple NO-OP.
>> - The operator automatically picks up the least busy of the thread pool
>> worker which has JEP embedded in it to execute the call.
>> - The JEP based installation will not support non Cpython modules. All of
>> the major python libraries are cpython based and hence I believe this is of
>> a lesser concern. If we hit a roadblock when a new python library being a
>> non-Cpython based library needs to be run, then we could implement the
>> ApexPythonEngine interface to something like Py4J which involves
>> interprocess communication.
>> - The python operator requires the user to set the library path
>> java.library.path for the operator to make use of the dynamic libraries of
>> the corresponding platform. This has to be passed in as the JVM options.
>> Failing to do so will result in the operator failing to load the
>> interpreter properly.
>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6. Numpy >=
>> 1.7 is supported.
>> - There is no support for virtual environments yet. In case of multiple
>> python versions on the node, to include the right python version for the
>> apex operator, ensure that the environment variables and the dynamic
>> library path are set appropriately. This is a workaround and I hope
>> APEXCORE-796 will solve this issue as well.
>> 
>> 
>> Regards,
>> Ananth
>> 
>> 

Reply via email to