Sweet, filed here: https://issues.apache.org/jira/browse/SPARK-8632

On Thu, Jun 25, 2015 at 3:05 AM Davies Liu <dav...@databricks.com> wrote:

> I'm thinking that the batched synchronous version will be too slow
> (with small batch size) or easy to OOM with large (batch size). If
> it's not that hard, you can give it a try.
>
> On Wed, Jun 24, 2015 at 4:39 PM, Justin Uang <justin.u...@gmail.com>
> wrote:
> > Correct, I was running with a batch size of about 100 when I did the
> tests,
> > because I was worried about deadlocks. Do you have any concerns regarding
> > the batched synchronous version of communication between the Java and
> Python
> > processes, and if not, should I file a ticket and starting writing it?
> >
> > On Wed, Jun 24, 2015 at 7:27 PM Davies Liu <dav...@databricks.com>
> wrote:
> >>
> >> From you comment, the 2x improvement only happens when you have the
> >> batch size as 1, right?
> >>
> >> On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <justin.u...@gmail.com>
> >> wrote:
> >> > FYI, just submitted a PR to Pyrolite to remove their StopException.
> >> > https://github.com/irmen/Pyrolite/pull/30
> >> >
> >> > With my benchmark, removing it basically made it about 2x faster.
> >> >
> >> > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal
> >> > <punya.bis...@gmail.com>
> >> > wrote:
> >> >>
> >> >> Hi Davies,
> >> >>
> >> >> In general, do we expect people to use CPython only for "heavyweight"
> >> >> UDFs
> >> >> that invoke an external library? Are there any examples of using
> >> >> Jython,
> >> >> especially performance comparisons to Java/Scala and CPython? When
> >> >> using
> >> >> Jython, do you expect the driver to send code to the executor as a
> >> >> string,
> >> >> or is there a good way to serialized Jython lambdas?
> >> >>
> >> >> (For context, I was unable to serialize Nashorn lambdas when I tried
> to
> >> >> use them in Spark.)
> >> >>
> >> >> Punya
> >> >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <dav...@databricks.com>
> >> >> wrote:
> >> >>>
> >> >>> Fare points, I also like simpler solutions.
> >> >>>
> >> >>> The overhead of Python task could be a few of milliseconds, which
> >> >>> means we also should eval them as batches (one Python task per
> batch).
> >> >>>
> >> >>> Decreasing the batch size for UDF sounds reasonable to me, together
> >> >>> with other tricks to reduce the data in socket/pipe buffer.
> >> >>>
> >> >>> BTW, what do your UDF looks like? How about to use Jython to run
> >> >>> simple Python UDF (without some external libraries).
> >> >>>
> >> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.u...@gmail.com
> >
> >> >>> wrote:
> >> >>> > // + punya
> >> >>> >
> >> >>> > Thanks for your quick response!
> >> >>> >
> >> >>> > I'm not sure that using an unbounded buffer is a good solution to
> >> >>> > the
> >> >>> > locking problem. For example, in the situation where I had 500
> >> >>> > columns,
> >> >>> > I am
> >> >>> > in fact storing 499 extra columns on the java side, which might
> make
> >> >>> > me
> >> >>> > OOM
> >> >>> > if I have to store many rows. In addition, if I am using an
> >> >>> > AutoBatchedSerializer, the java side might have to write 1 << 16
> ==
> >> >>> > 65536
> >> >>> > rows before python starts outputting elements, in which case, the
> >> >>> > Java
> >> >>> > side
> >> >>> > has to buffer 65536 complete rows. In general it seems fragile to
> >> >>> > rely
> >> >>> > on
> >> >>> > blocking behavior in the Python coprocess. By contrast, it's very
> >> >>> > easy
> >> >>> > to
> >> >>> > verify the correctness and performance characteristics of the
> >> >>> > synchronous
> >> >>> > blocking solution.
> >> >>> >
> >> >>> >
> >> >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <dav...@databricks.com
> >
> >> >>> > wrote:
> >> >>> >>
> >> >>> >> Thanks for looking into it, I'd like the idea of having
> >> >>> >> ForkingIterator. If we have unlimited buffer in it, then will not
> >> >>> >> have
> >> >>> >> the problem of deadlock, I think. The writing thread will be
> >> >>> >> blocked
> >> >>> >> by Python process, so there will be not much rows be
> buffered(still
> >> >>> >> be
> >> >>> >> a reason to OOM). At least, this approach is better than current
> >> >>> >> one.
> >> >>> >>
> >> >>> >> Could you create a JIRA and sending out the PR?
> >> >>> >>
> >> >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang
> >> >>> >> <justin.u...@gmail.com>
> >> >>> >> wrote:
> >> >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at
> large
> >> >>> >> > scale,
> >> >>> >> > but
> >> >>> >> > I have a proof-of-concept implementation that avoids caching
> the
> >> >>> >> > entire
> >> >>> >> > dataset.
> >> >>> >> >
> >> >>> >> > Hi,
> >> >>> >> >
> >> >>> >> > We have been running into performance problems using Python
> UDFs
> >> >>> >> > with
> >> >>> >> > DataFrames at large scale.
> >> >>> >> >
> >> >>> >> > From the implementation of BatchPythonEvaluation, it looks like
> >> >>> >> > the
> >> >>> >> > goal
> >> >>> >> > was
> >> >>> >> > to reuse the PythonRDD code. It caches the entire child RDD so
> >> >>> >> > that
> >> >>> >> > it
> >> >>> >> > can
> >> >>> >> > do two passes over the data. One to give to the PythonRDD, then
> >> >>> >> > one
> >> >>> >> > to
> >> >>> >> > join
> >> >>> >> > the python lambda results with the original row (which may have
> >> >>> >> > java
> >> >>> >> > objects
> >> >>> >> > that should be passed through).
> >> >>> >> >
> >> >>> >> > In addition, it caches all the columns, even the ones that
> don't
> >> >>> >> > need to
> >> >>> >> > be
> >> >>> >> > processed by the Python UDF. In the cases I was working with, I
> >> >>> >> > had
> >> >>> >> > a
> >> >>> >> > 500
> >> >>> >> > column table, and i wanted to use a python UDF for one column,
> >> >>> >> > and
> >> >>> >> > it
> >> >>> >> > ended
> >> >>> >> > up caching all 500 columns.
> >> >>> >> >
> >> >>> >> > I have a working solution over here that does it in one pass
> over
> >> >>> >> > the
> >> >>> >> > data,
> >> >>> >> > avoiding caching
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >
> >> >>> >> > (
> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
> ).
> >> >>> >> > With this patch, I go from a job that takes 20 minutes then
> OOMs,
> >> >>> >> > to
> >> >>> >> > a
> >> >>> >> > job
> >> >>> >> > that finishes completely in 3 minutes. It is indeed quite hacky
> >> >>> >> > and
> >> >>> >> > prone to
> >> >>> >> > deadlocks since there is buffering in many locations:
> >> >>> >> >
> >> >>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
> >> >>> >> >     - batching the rows before pickling them
> >> >>> >> >     - os buffers on both sides
> >> >>> >> >     - pyspark.serializers.BatchedSerializer
> >> >>> >> >
> >> >>> >> > We can avoid deadlock by being very disciplined. For example,
> we
> >> >>> >> > can
> >> >>> >> > have
> >> >>> >> > the ForkingIterator instead always do a check of whether the
> >> >>> >> > LinkedBlockingDeque is full and if so:
> >> >>> >> >
> >> >>> >> > Java
> >> >>> >> >     - flush the java pickling buffer
> >> >>> >> >     - send a flush command to the python process
> >> >>> >> >     - os.flush the java side
> >> >>> >> >
> >> >>> >> > Python
> >> >>> >> >     - flush BatchedSerializer
> >> >>> >> >     - os.flush()
> >> >>> >> >
> >> >>> >> > I haven't added this yet. This is getting very complex however.
> >> >>> >> > Another
> >> >>> >> > model would just be to change the protocol between the java
> side
> >> >>> >> > and
> >> >>> >> > the
> >> >>> >> > worker to be a synchronous request/response. This has the
> >> >>> >> > disadvantage
> >> >>> >> > that
> >> >>> >> > the CPU isn't doing anything when the batch is being sent
> across,
> >> >>> >> > but it
> >> >>> >> > has
> >> >>> >> > the huge advantage of simplicity. In addition, I imagine that
> the
> >> >>> >> > actual
> >> >>> >> > IO
> >> >>> >> > between the processes isn't that slow, but rather the
> >> >>> >> > serialization
> >> >>> >> > of
> >> >>> >> > java
> >> >>> >> > objects into pickled bytes, and the
> deserialization/serialization
> >> >>> >> > +
> >> >>> >> > python
> >> >>> >> > loops on the python side. Another advantage is that we won't be
> >> >>> >> > taking
> >> >>> >> > more
> >> >>> >> > than 100% CPU since only one thread is doing CPU work at a time
> >> >>> >> > between
> >> >>> >> > the
> >> >>> >> > executor and the python interpreter.
> >> >>> >> >
> >> >>> >> > Any thoughts would be much appreciated =)
> >> >>> >> >
> >> >>> >> > Other improvements:
> >> >>> >> >     - extract some code of the worker out of PythonRDD so that
> we
> >> >>> >> > can do
> >> >>> >> > a
> >> >>> >> > mapPartitions directly in BatchedPythonEvaluation without
> >> >>> >> > resorting
> >> >>> >> > to
> >> >>> >> > the
> >> >>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure
> that
> >> >>> >> > the
> >> >>> >> > other
> >> >>> >> > RDD can get a handle to the same iterator.
> >> >>> >> >     - read elements and use a size estimator to create the
> >> >>> >> > BlockingQueue
> >> >>> >> > to
> >> >>> >> > make sure that we don't store too many things in memory when
> >> >>> >> > batching
> >> >>> >> >     - patch Unpickler to not use StopException for control
> flow,
> >> >>> >> > which
> >> >>> >> > is
> >> >>> >> > slowing down the java side
> >> >>> >> >
> >> >>> >> >
>

Reply via email to