Sweet, filed here: https://issues.apache.org/jira/browse/SPARK-8632
On Thu, Jun 25, 2015 at 3:05 AM Davies Liu <dav...@databricks.com> wrote: > I'm thinking that the batched synchronous version will be too slow > (with small batch size) or easy to OOM with large (batch size). If > it's not that hard, you can give it a try. > > On Wed, Jun 24, 2015 at 4:39 PM, Justin Uang <justin.u...@gmail.com> > wrote: > > Correct, I was running with a batch size of about 100 when I did the > tests, > > because I was worried about deadlocks. Do you have any concerns regarding > > the batched synchronous version of communication between the Java and > Python > > processes, and if not, should I file a ticket and starting writing it? > > > > On Wed, Jun 24, 2015 at 7:27 PM Davies Liu <dav...@databricks.com> > wrote: > >> > >> From you comment, the 2x improvement only happens when you have the > >> batch size as 1, right? > >> > >> On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <justin.u...@gmail.com> > >> wrote: > >> > FYI, just submitted a PR to Pyrolite to remove their StopException. > >> > https://github.com/irmen/Pyrolite/pull/30 > >> > > >> > With my benchmark, removing it basically made it about 2x faster. > >> > > >> > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal > >> > <punya.bis...@gmail.com> > >> > wrote: > >> >> > >> >> Hi Davies, > >> >> > >> >> In general, do we expect people to use CPython only for "heavyweight" > >> >> UDFs > >> >> that invoke an external library? Are there any examples of using > >> >> Jython, > >> >> especially performance comparisons to Java/Scala and CPython? When > >> >> using > >> >> Jython, do you expect the driver to send code to the executor as a > >> >> string, > >> >> or is there a good way to serialized Jython lambdas? > >> >> > >> >> (For context, I was unable to serialize Nashorn lambdas when I tried > to > >> >> use them in Spark.) > >> >> > >> >> Punya > >> >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <dav...@databricks.com> > >> >> wrote: > >> >>> > >> >>> Fare points, I also like simpler solutions. > >> >>> > >> >>> The overhead of Python task could be a few of milliseconds, which > >> >>> means we also should eval them as batches (one Python task per > batch). > >> >>> > >> >>> Decreasing the batch size for UDF sounds reasonable to me, together > >> >>> with other tricks to reduce the data in socket/pipe buffer. > >> >>> > >> >>> BTW, what do your UDF looks like? How about to use Jython to run > >> >>> simple Python UDF (without some external libraries). > >> >>> > >> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.u...@gmail.com > > > >> >>> wrote: > >> >>> > // + punya > >> >>> > > >> >>> > Thanks for your quick response! > >> >>> > > >> >>> > I'm not sure that using an unbounded buffer is a good solution to > >> >>> > the > >> >>> > locking problem. For example, in the situation where I had 500 > >> >>> > columns, > >> >>> > I am > >> >>> > in fact storing 499 extra columns on the java side, which might > make > >> >>> > me > >> >>> > OOM > >> >>> > if I have to store many rows. In addition, if I am using an > >> >>> > AutoBatchedSerializer, the java side might have to write 1 << 16 > == > >> >>> > 65536 > >> >>> > rows before python starts outputting elements, in which case, the > >> >>> > Java > >> >>> > side > >> >>> > has to buffer 65536 complete rows. In general it seems fragile to > >> >>> > rely > >> >>> > on > >> >>> > blocking behavior in the Python coprocess. By contrast, it's very > >> >>> > easy > >> >>> > to > >> >>> > verify the correctness and performance characteristics of the > >> >>> > synchronous > >> >>> > blocking solution. > >> >>> > > >> >>> > > >> >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <dav...@databricks.com > > > >> >>> > wrote: > >> >>> >> > >> >>> >> Thanks for looking into it, I'd like the idea of having > >> >>> >> ForkingIterator. If we have unlimited buffer in it, then will not > >> >>> >> have > >> >>> >> the problem of deadlock, I think. The writing thread will be > >> >>> >> blocked > >> >>> >> by Python process, so there will be not much rows be > buffered(still > >> >>> >> be > >> >>> >> a reason to OOM). At least, this approach is better than current > >> >>> >> one. > >> >>> >> > >> >>> >> Could you create a JIRA and sending out the PR? > >> >>> >> > >> >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang > >> >>> >> <justin.u...@gmail.com> > >> >>> >> wrote: > >> >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at > large > >> >>> >> > scale, > >> >>> >> > but > >> >>> >> > I have a proof-of-concept implementation that avoids caching > the > >> >>> >> > entire > >> >>> >> > dataset. > >> >>> >> > > >> >>> >> > Hi, > >> >>> >> > > >> >>> >> > We have been running into performance problems using Python > UDFs > >> >>> >> > with > >> >>> >> > DataFrames at large scale. > >> >>> >> > > >> >>> >> > From the implementation of BatchPythonEvaluation, it looks like > >> >>> >> > the > >> >>> >> > goal > >> >>> >> > was > >> >>> >> > to reuse the PythonRDD code. It caches the entire child RDD so > >> >>> >> > that > >> >>> >> > it > >> >>> >> > can > >> >>> >> > do two passes over the data. One to give to the PythonRDD, then > >> >>> >> > one > >> >>> >> > to > >> >>> >> > join > >> >>> >> > the python lambda results with the original row (which may have > >> >>> >> > java > >> >>> >> > objects > >> >>> >> > that should be passed through). > >> >>> >> > > >> >>> >> > In addition, it caches all the columns, even the ones that > don't > >> >>> >> > need to > >> >>> >> > be > >> >>> >> > processed by the Python UDF. In the cases I was working with, I > >> >>> >> > had > >> >>> >> > a > >> >>> >> > 500 > >> >>> >> > column table, and i wanted to use a python UDF for one column, > >> >>> >> > and > >> >>> >> > it > >> >>> >> > ended > >> >>> >> > up caching all 500 columns. > >> >>> >> > > >> >>> >> > I have a working solution over here that does it in one pass > over > >> >>> >> > the > >> >>> >> > data, > >> >>> >> > avoiding caching > >> >>> >> > > >> >>> >> > > >> >>> >> > > >> >>> >> > ( > https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b > ). > >> >>> >> > With this patch, I go from a job that takes 20 minutes then > OOMs, > >> >>> >> > to > >> >>> >> > a > >> >>> >> > job > >> >>> >> > that finishes completely in 3 minutes. It is indeed quite hacky > >> >>> >> > and > >> >>> >> > prone to > >> >>> >> > deadlocks since there is buffering in many locations: > >> >>> >> > > >> >>> >> > - NEW: the ForkingIterator LinkedBlockingDeque > >> >>> >> > - batching the rows before pickling them > >> >>> >> > - os buffers on both sides > >> >>> >> > - pyspark.serializers.BatchedSerializer > >> >>> >> > > >> >>> >> > We can avoid deadlock by being very disciplined. For example, > we > >> >>> >> > can > >> >>> >> > have > >> >>> >> > the ForkingIterator instead always do a check of whether the > >> >>> >> > LinkedBlockingDeque is full and if so: > >> >>> >> > > >> >>> >> > Java > >> >>> >> > - flush the java pickling buffer > >> >>> >> > - send a flush command to the python process > >> >>> >> > - os.flush the java side > >> >>> >> > > >> >>> >> > Python > >> >>> >> > - flush BatchedSerializer > >> >>> >> > - os.flush() > >> >>> >> > > >> >>> >> > I haven't added this yet. This is getting very complex however. > >> >>> >> > Another > >> >>> >> > model would just be to change the protocol between the java > side > >> >>> >> > and > >> >>> >> > the > >> >>> >> > worker to be a synchronous request/response. This has the > >> >>> >> > disadvantage > >> >>> >> > that > >> >>> >> > the CPU isn't doing anything when the batch is being sent > across, > >> >>> >> > but it > >> >>> >> > has > >> >>> >> > the huge advantage of simplicity. In addition, I imagine that > the > >> >>> >> > actual > >> >>> >> > IO > >> >>> >> > between the processes isn't that slow, but rather the > >> >>> >> > serialization > >> >>> >> > of > >> >>> >> > java > >> >>> >> > objects into pickled bytes, and the > deserialization/serialization > >> >>> >> > + > >> >>> >> > python > >> >>> >> > loops on the python side. Another advantage is that we won't be > >> >>> >> > taking > >> >>> >> > more > >> >>> >> > than 100% CPU since only one thread is doing CPU work at a time > >> >>> >> > between > >> >>> >> > the > >> >>> >> > executor and the python interpreter. > >> >>> >> > > >> >>> >> > Any thoughts would be much appreciated =) > >> >>> >> > > >> >>> >> > Other improvements: > >> >>> >> > - extract some code of the worker out of PythonRDD so that > we > >> >>> >> > can do > >> >>> >> > a > >> >>> >> > mapPartitions directly in BatchedPythonEvaluation without > >> >>> >> > resorting > >> >>> >> > to > >> >>> >> > the > >> >>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure > that > >> >>> >> > the > >> >>> >> > other > >> >>> >> > RDD can get a handle to the same iterator. > >> >>> >> > - read elements and use a size estimator to create the > >> >>> >> > BlockingQueue > >> >>> >> > to > >> >>> >> > make sure that we don't store too many things in memory when > >> >>> >> > batching > >> >>> >> > - patch Unpickler to not use StopException for control > flow, > >> >>> >> > which > >> >>> >> > is > >> >>> >> > slowing down the java side > >> >>> >> > > >> >>> >> > >