>From you comment, the 2x improvement only happens when you have the batch size as 1, right?
On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <justin.u...@gmail.com> wrote: > FYI, just submitted a PR to Pyrolite to remove their StopException. > https://github.com/irmen/Pyrolite/pull/30 > > With my benchmark, removing it basically made it about 2x faster. > > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal <punya.bis...@gmail.com> > wrote: >> >> Hi Davies, >> >> In general, do we expect people to use CPython only for "heavyweight" UDFs >> that invoke an external library? Are there any examples of using Jython, >> especially performance comparisons to Java/Scala and CPython? When using >> Jython, do you expect the driver to send code to the executor as a string, >> or is there a good way to serialized Jython lambdas? >> >> (For context, I was unable to serialize Nashorn lambdas when I tried to >> use them in Spark.) >> >> Punya >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <dav...@databricks.com> wrote: >>> >>> Fare points, I also like simpler solutions. >>> >>> The overhead of Python task could be a few of milliseconds, which >>> means we also should eval them as batches (one Python task per batch). >>> >>> Decreasing the batch size for UDF sounds reasonable to me, together >>> with other tricks to reduce the data in socket/pipe buffer. >>> >>> BTW, what do your UDF looks like? How about to use Jython to run >>> simple Python UDF (without some external libraries). >>> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.u...@gmail.com> >>> wrote: >>> > // + punya >>> > >>> > Thanks for your quick response! >>> > >>> > I'm not sure that using an unbounded buffer is a good solution to the >>> > locking problem. For example, in the situation where I had 500 columns, >>> > I am >>> > in fact storing 499 extra columns on the java side, which might make me >>> > OOM >>> > if I have to store many rows. In addition, if I am using an >>> > AutoBatchedSerializer, the java side might have to write 1 << 16 == >>> > 65536 >>> > rows before python starts outputting elements, in which case, the Java >>> > side >>> > has to buffer 65536 complete rows. In general it seems fragile to rely >>> > on >>> > blocking behavior in the Python coprocess. By contrast, it's very easy >>> > to >>> > verify the correctness and performance characteristics of the >>> > synchronous >>> > blocking solution. >>> > >>> > >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <dav...@databricks.com> >>> > wrote: >>> >> >>> >> Thanks for looking into it, I'd like the idea of having >>> >> ForkingIterator. If we have unlimited buffer in it, then will not have >>> >> the problem of deadlock, I think. The writing thread will be blocked >>> >> by Python process, so there will be not much rows be buffered(still be >>> >> a reason to OOM). At least, this approach is better than current one. >>> >> >>> >> Could you create a JIRA and sending out the PR? >>> >> >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.u...@gmail.com> >>> >> wrote: >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large >>> >> > scale, >>> >> > but >>> >> > I have a proof-of-concept implementation that avoids caching the >>> >> > entire >>> >> > dataset. >>> >> > >>> >> > Hi, >>> >> > >>> >> > We have been running into performance problems using Python UDFs >>> >> > with >>> >> > DataFrames at large scale. >>> >> > >>> >> > From the implementation of BatchPythonEvaluation, it looks like the >>> >> > goal >>> >> > was >>> >> > to reuse the PythonRDD code. It caches the entire child RDD so that >>> >> > it >>> >> > can >>> >> > do two passes over the data. One to give to the PythonRDD, then one >>> >> > to >>> >> > join >>> >> > the python lambda results with the original row (which may have java >>> >> > objects >>> >> > that should be passed through). >>> >> > >>> >> > In addition, it caches all the columns, even the ones that don't >>> >> > need to >>> >> > be >>> >> > processed by the Python UDF. In the cases I was working with, I had >>> >> > a >>> >> > 500 >>> >> > column table, and i wanted to use a python UDF for one column, and >>> >> > it >>> >> > ended >>> >> > up caching all 500 columns. >>> >> > >>> >> > I have a working solution over here that does it in one pass over >>> >> > the >>> >> > data, >>> >> > avoiding caching >>> >> > >>> >> > >>> >> > (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b). >>> >> > With this patch, I go from a job that takes 20 minutes then OOMs, to >>> >> > a >>> >> > job >>> >> > that finishes completely in 3 minutes. It is indeed quite hacky and >>> >> > prone to >>> >> > deadlocks since there is buffering in many locations: >>> >> > >>> >> > - NEW: the ForkingIterator LinkedBlockingDeque >>> >> > - batching the rows before pickling them >>> >> > - os buffers on both sides >>> >> > - pyspark.serializers.BatchedSerializer >>> >> > >>> >> > We can avoid deadlock by being very disciplined. For example, we can >>> >> > have >>> >> > the ForkingIterator instead always do a check of whether the >>> >> > LinkedBlockingDeque is full and if so: >>> >> > >>> >> > Java >>> >> > - flush the java pickling buffer >>> >> > - send a flush command to the python process >>> >> > - os.flush the java side >>> >> > >>> >> > Python >>> >> > - flush BatchedSerializer >>> >> > - os.flush() >>> >> > >>> >> > I haven't added this yet. This is getting very complex however. >>> >> > Another >>> >> > model would just be to change the protocol between the java side and >>> >> > the >>> >> > worker to be a synchronous request/response. This has the >>> >> > disadvantage >>> >> > that >>> >> > the CPU isn't doing anything when the batch is being sent across, >>> >> > but it >>> >> > has >>> >> > the huge advantage of simplicity. In addition, I imagine that the >>> >> > actual >>> >> > IO >>> >> > between the processes isn't that slow, but rather the serialization >>> >> > of >>> >> > java >>> >> > objects into pickled bytes, and the deserialization/serialization + >>> >> > python >>> >> > loops on the python side. Another advantage is that we won't be >>> >> > taking >>> >> > more >>> >> > than 100% CPU since only one thread is doing CPU work at a time >>> >> > between >>> >> > the >>> >> > executor and the python interpreter. >>> >> > >>> >> > Any thoughts would be much appreciated =) >>> >> > >>> >> > Other improvements: >>> >> > - extract some code of the worker out of PythonRDD so that we >>> >> > can do >>> >> > a >>> >> > mapPartitions directly in BatchedPythonEvaluation without resorting >>> >> > to >>> >> > the >>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that >>> >> > the >>> >> > other >>> >> > RDD can get a handle to the same iterator. >>> >> > - read elements and use a size estimator to create the >>> >> > BlockingQueue >>> >> > to >>> >> > make sure that we don't store too many things in memory when >>> >> > batching >>> >> > - patch Unpickler to not use StopException for control flow, >>> >> > which >>> >> > is >>> >> > slowing down the java side >>> >> > >>> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org