Hi Davies, In general, do we expect people to use CPython only for "heavyweight" UDFs that invoke an external library? Are there any examples of using Jython, especially performance comparisons to Java/Scala and CPython? When using Jython, do you expect the driver to send code to the executor as a string, or is there a good way to serialized Jython lambdas?
(For context, I was unable to serialize Nashorn lambdas when I tried to use them in Spark.) Punya On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <dav...@databricks.com> wrote: > Fare points, I also like simpler solutions. > > The overhead of Python task could be a few of milliseconds, which > means we also should eval them as batches (one Python task per batch). > > Decreasing the batch size for UDF sounds reasonable to me, together > with other tricks to reduce the data in socket/pipe buffer. > > BTW, what do your UDF looks like? How about to use Jython to run > simple Python UDF (without some external libraries). > > On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.u...@gmail.com> > wrote: > > // + punya > > > > Thanks for your quick response! > > > > I'm not sure that using an unbounded buffer is a good solution to the > > locking problem. For example, in the situation where I had 500 columns, > I am > > in fact storing 499 extra columns on the java side, which might make me > OOM > > if I have to store many rows. In addition, if I am using an > > AutoBatchedSerializer, the java side might have to write 1 << 16 == 65536 > > rows before python starts outputting elements, in which case, the Java > side > > has to buffer 65536 complete rows. In general it seems fragile to rely on > > blocking behavior in the Python coprocess. By contrast, it's very easy to > > verify the correctness and performance characteristics of the synchronous > > blocking solution. > > > > > > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <dav...@databricks.com> > wrote: > >> > >> Thanks for looking into it, I'd like the idea of having > >> ForkingIterator. If we have unlimited buffer in it, then will not have > >> the problem of deadlock, I think. The writing thread will be blocked > >> by Python process, so there will be not much rows be buffered(still be > >> a reason to OOM). At least, this approach is better than current one. > >> > >> Could you create a JIRA and sending out the PR? > >> > >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.u...@gmail.com> > >> wrote: > >> > BLUF: BatchPythonEvaluation's implementation is unusable at large > scale, > >> > but > >> > I have a proof-of-concept implementation that avoids caching the > entire > >> > dataset. > >> > > >> > Hi, > >> > > >> > We have been running into performance problems using Python UDFs with > >> > DataFrames at large scale. > >> > > >> > From the implementation of BatchPythonEvaluation, it looks like the > goal > >> > was > >> > to reuse the PythonRDD code. It caches the entire child RDD so that it > >> > can > >> > do two passes over the data. One to give to the PythonRDD, then one to > >> > join > >> > the python lambda results with the original row (which may have java > >> > objects > >> > that should be passed through). > >> > > >> > In addition, it caches all the columns, even the ones that don't need > to > >> > be > >> > processed by the Python UDF. In the cases I was working with, I had a > >> > 500 > >> > column table, and i wanted to use a python UDF for one column, and it > >> > ended > >> > up caching all 500 columns. > >> > > >> > I have a working solution over here that does it in one pass over the > >> > data, > >> > avoiding caching > >> > > >> > ( > https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b > ). > >> > With this patch, I go from a job that takes 20 minutes then OOMs, to a > >> > job > >> > that finishes completely in 3 minutes. It is indeed quite hacky and > >> > prone to > >> > deadlocks since there is buffering in many locations: > >> > > >> > - NEW: the ForkingIterator LinkedBlockingDeque > >> > - batching the rows before pickling them > >> > - os buffers on both sides > >> > - pyspark.serializers.BatchedSerializer > >> > > >> > We can avoid deadlock by being very disciplined. For example, we can > >> > have > >> > the ForkingIterator instead always do a check of whether the > >> > LinkedBlockingDeque is full and if so: > >> > > >> > Java > >> > - flush the java pickling buffer > >> > - send a flush command to the python process > >> > - os.flush the java side > >> > > >> > Python > >> > - flush BatchedSerializer > >> > - os.flush() > >> > > >> > I haven't added this yet. This is getting very complex however. > Another > >> > model would just be to change the protocol between the java side and > the > >> > worker to be a synchronous request/response. This has the disadvantage > >> > that > >> > the CPU isn't doing anything when the batch is being sent across, but > it > >> > has > >> > the huge advantage of simplicity. In addition, I imagine that the > actual > >> > IO > >> > between the processes isn't that slow, but rather the serialization of > >> > java > >> > objects into pickled bytes, and the deserialization/serialization + > >> > python > >> > loops on the python side. Another advantage is that we won't be taking > >> > more > >> > than 100% CPU since only one thread is doing CPU work at a time > between > >> > the > >> > executor and the python interpreter. > >> > > >> > Any thoughts would be much appreciated =) > >> > > >> > Other improvements: > >> > - extract some code of the worker out of PythonRDD so that we can > do > >> > a > >> > mapPartitions directly in BatchedPythonEvaluation without resorting to > >> > the > >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the > >> > other > >> > RDD can get a handle to the same iterator. > >> > - read elements and use a size estimator to create the > BlockingQueue > >> > to > >> > make sure that we don't store too many things in memory when batching > >> > - patch Unpickler to not use StopException for control flow, which > >> > is > >> > slowing down the java side > >> > > >> > >