Correct, I was running with a batch size of about 100 when I did the tests, because I was worried about deadlocks. Do you have any concerns regarding the batched synchronous version of communication between the Java and Python processes, and if not, should I file a ticket and starting writing it? On Wed, Jun 24, 2015 at 7:27 PM Davies Liu <dav...@databricks.com> wrote:
> From you comment, the 2x improvement only happens when you have the > batch size as 1, right? > > On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <justin.u...@gmail.com> > wrote: > > FYI, just submitted a PR to Pyrolite to remove their StopException. > > https://github.com/irmen/Pyrolite/pull/30 > > > > With my benchmark, removing it basically made it about 2x faster. > > > > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal < > punya.bis...@gmail.com> > > wrote: > >> > >> Hi Davies, > >> > >> In general, do we expect people to use CPython only for "heavyweight" > UDFs > >> that invoke an external library? Are there any examples of using Jython, > >> especially performance comparisons to Java/Scala and CPython? When using > >> Jython, do you expect the driver to send code to the executor as a > string, > >> or is there a good way to serialized Jython lambdas? > >> > >> (For context, I was unable to serialize Nashorn lambdas when I tried to > >> use them in Spark.) > >> > >> Punya > >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <dav...@databricks.com> > wrote: > >>> > >>> Fare points, I also like simpler solutions. > >>> > >>> The overhead of Python task could be a few of milliseconds, which > >>> means we also should eval them as batches (one Python task per batch). > >>> > >>> Decreasing the batch size for UDF sounds reasonable to me, together > >>> with other tricks to reduce the data in socket/pipe buffer. > >>> > >>> BTW, what do your UDF looks like? How about to use Jython to run > >>> simple Python UDF (without some external libraries). > >>> > >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.u...@gmail.com> > >>> wrote: > >>> > // + punya > >>> > > >>> > Thanks for your quick response! > >>> > > >>> > I'm not sure that using an unbounded buffer is a good solution to the > >>> > locking problem. For example, in the situation where I had 500 > columns, > >>> > I am > >>> > in fact storing 499 extra columns on the java side, which might make > me > >>> > OOM > >>> > if I have to store many rows. In addition, if I am using an > >>> > AutoBatchedSerializer, the java side might have to write 1 << 16 == > >>> > 65536 > >>> > rows before python starts outputting elements, in which case, the > Java > >>> > side > >>> > has to buffer 65536 complete rows. In general it seems fragile to > rely > >>> > on > >>> > blocking behavior in the Python coprocess. By contrast, it's very > easy > >>> > to > >>> > verify the correctness and performance characteristics of the > >>> > synchronous > >>> > blocking solution. > >>> > > >>> > > >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <dav...@databricks.com> > >>> > wrote: > >>> >> > >>> >> Thanks for looking into it, I'd like the idea of having > >>> >> ForkingIterator. If we have unlimited buffer in it, then will not > have > >>> >> the problem of deadlock, I think. The writing thread will be blocked > >>> >> by Python process, so there will be not much rows be buffered(still > be > >>> >> a reason to OOM). At least, this approach is better than current > one. > >>> >> > >>> >> Could you create a JIRA and sending out the PR? > >>> >> > >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.u...@gmail.com > > > >>> >> wrote: > >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large > >>> >> > scale, > >>> >> > but > >>> >> > I have a proof-of-concept implementation that avoids caching the > >>> >> > entire > >>> >> > dataset. > >>> >> > > >>> >> > Hi, > >>> >> > > >>> >> > We have been running into performance problems using Python UDFs > >>> >> > with > >>> >> > DataFrames at large scale. > >>> >> > > >>> >> > From the implementation of BatchPythonEvaluation, it looks like > the > >>> >> > goal > >>> >> > was > >>> >> > to reuse the PythonRDD code. It caches the entire child RDD so > that > >>> >> > it > >>> >> > can > >>> >> > do two passes over the data. One to give to the PythonRDD, then > one > >>> >> > to > >>> >> > join > >>> >> > the python lambda results with the original row (which may have > java > >>> >> > objects > >>> >> > that should be passed through). > >>> >> > > >>> >> > In addition, it caches all the columns, even the ones that don't > >>> >> > need to > >>> >> > be > >>> >> > processed by the Python UDF. In the cases I was working with, I > had > >>> >> > a > >>> >> > 500 > >>> >> > column table, and i wanted to use a python UDF for one column, and > >>> >> > it > >>> >> > ended > >>> >> > up caching all 500 columns. > >>> >> > > >>> >> > I have a working solution over here that does it in one pass over > >>> >> > the > >>> >> > data, > >>> >> > avoiding caching > >>> >> > > >>> >> > > >>> >> > ( > https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b > ). > >>> >> > With this patch, I go from a job that takes 20 minutes then OOMs, > to > >>> >> > a > >>> >> > job > >>> >> > that finishes completely in 3 minutes. It is indeed quite hacky > and > >>> >> > prone to > >>> >> > deadlocks since there is buffering in many locations: > >>> >> > > >>> >> > - NEW: the ForkingIterator LinkedBlockingDeque > >>> >> > - batching the rows before pickling them > >>> >> > - os buffers on both sides > >>> >> > - pyspark.serializers.BatchedSerializer > >>> >> > > >>> >> > We can avoid deadlock by being very disciplined. For example, we > can > >>> >> > have > >>> >> > the ForkingIterator instead always do a check of whether the > >>> >> > LinkedBlockingDeque is full and if so: > >>> >> > > >>> >> > Java > >>> >> > - flush the java pickling buffer > >>> >> > - send a flush command to the python process > >>> >> > - os.flush the java side > >>> >> > > >>> >> > Python > >>> >> > - flush BatchedSerializer > >>> >> > - os.flush() > >>> >> > > >>> >> > I haven't added this yet. This is getting very complex however. > >>> >> > Another > >>> >> > model would just be to change the protocol between the java side > and > >>> >> > the > >>> >> > worker to be a synchronous request/response. This has the > >>> >> > disadvantage > >>> >> > that > >>> >> > the CPU isn't doing anything when the batch is being sent across, > >>> >> > but it > >>> >> > has > >>> >> > the huge advantage of simplicity. In addition, I imagine that the > >>> >> > actual > >>> >> > IO > >>> >> > between the processes isn't that slow, but rather the > serialization > >>> >> > of > >>> >> > java > >>> >> > objects into pickled bytes, and the deserialization/serialization > + > >>> >> > python > >>> >> > loops on the python side. Another advantage is that we won't be > >>> >> > taking > >>> >> > more > >>> >> > than 100% CPU since only one thread is doing CPU work at a time > >>> >> > between > >>> >> > the > >>> >> > executor and the python interpreter. > >>> >> > > >>> >> > Any thoughts would be much appreciated =) > >>> >> > > >>> >> > Other improvements: > >>> >> > - extract some code of the worker out of PythonRDD so that we > >>> >> > can do > >>> >> > a > >>> >> > mapPartitions directly in BatchedPythonEvaluation without > resorting > >>> >> > to > >>> >> > the > >>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that > >>> >> > the > >>> >> > other > >>> >> > RDD can get a handle to the same iterator. > >>> >> > - read elements and use a size estimator to create the > >>> >> > BlockingQueue > >>> >> > to > >>> >> > make sure that we don't store too many things in memory when > >>> >> > batching > >>> >> > - patch Unpickler to not use StopException for control flow, > >>> >> > which > >>> >> > is > >>> >> > slowing down the java side > >>> >> > > >>> >> > >