Re: [SparkSQL 1.4]Could not use concat with UDF in where clause

2015-06-23 Thread Michael Armbrust
Can you file a JIRA please?

On Tue, Jun 23, 2015 at 1:42 AM, StanZhai m...@zhaishidan.cn wrote:

 Hi all,

 After upgraded the cluster from spark 1.3.1 to 1.4.0(rc4), I encountered
 the
 following exception when use concat with UDF in where clause:

 ===Exception
 org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to
 dataType on unresolved object, tree:
 'concat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(date#1776),年)
 at

 org.apache.spark.sql.catalyst.analysis.UnresolvedFunction.dataType(unresolved.scala:82)
 at

 org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5$$anonfun$applyOrElse$15.apply(HiveTypeCoercion.scala:299)
 at

 org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5$$anonfun$applyOrElse$15.apply(HiveTypeCoercion.scala:299)
 at

 scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:80)
 at scala.collection.immutable.List.exists(List.scala:84)
 at

 org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5.applyOrElse(HiveTypeCoercion.scala:299)
 at

 org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5.applyOrElse(HiveTypeCoercion.scala:298)
 at

 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
 at

 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
 at

 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 at

 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221)
 at
 org.apache.spark.sql.catalyst.plans.QueryPlan.org
 $apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:75)
 at

 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:85)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at

 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:94)
 at

 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:64)
 at

 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformAllExpressions$1.applyOrElse(QueryPlan.scala:136)
 at

 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformAllExpressions$1.applyOrElse(QueryPlan.scala:135)
 at

 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
 at

 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
 at

 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 at

 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221)
 at

 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:242)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at

 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:272)
 at

 

Re: Calculating tuple count /input rate with time

2015-06-23 Thread Tathagata Das
This should give accurate count for each batch, though for getting the rate
you have to make sure that you streaming app is stable, that is, batches
are processed as fast as they are received (scheduling delay in the spark
streaming UI is approx 0).

TD

On Tue, Jun 23, 2015 at 2:49 AM, anshu shukla anshushuk...@gmail.com
wrote:

 I am calculating input rate using the following logic.

 And i think this foreachRDD is always running on driver (println are seen on 
 driver)

 1- Is there any other way to do that in less cost .

 2- Will this give me the correct count for rate  .


 //code -

 inputStream.foreachRDD(new FunctionJavaRDDString, Void() {
 @Override
 public Void call(JavaRDDString stringJavaRDD) throws Exception {
 System.out.println(System.currentTimeMillis()+,spoutstringJavaRDD, 
 + stringJavaRDD.count() );
 return null;
 }
 });



 --
 Thanks  Regards,
 Anshu Shukla



Re: how can I write a language wrapper?

2015-06-23 Thread Shivaram Venkataraman
Every language has its own quirks / features -- so I don't think there
exists a document on how to go about doing this for a new language. The
most related write up I know of is the wiki page on PySpark internals
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals written
by Josh Rosen -- It covers some of the issues like closure capture,
serialization, JVM communication that you'll need to handle for a new
language.

Thanks
Shivaram

On Tue, Jun 23, 2015 at 1:35 PM, Vasili I. Galchin vigalc...@gmail.com
wrote:

 Hello,

   I want to add language support for another language(other than
 Scala, Java et. al.). Where is documentation that explains to provide
 support for a new language?

 Thank you,

 Vasili



Re: how can I write a language wrapper?

2015-06-23 Thread Matei Zaharia
Just FYI, it would be easiest to follow SparkR's example and add the DataFrame 
API first. Other APIs will be designed to work on DataFrames (most notably 
machine learning pipelines), and the surface of this API is much smaller than 
of the RDD API. This API will also give you great performance as we continue to 
optimize Spark SQL.

Matei

 On Jun 23, 2015, at 1:46 PM, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edu wrote:
 
 Every language has its own quirks / features -- so I don't think there exists 
 a document on how to go about doing this for a new language. The most related 
 write up I know of is the wiki page on PySpark internals 
 https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals 
 https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals written 
 by Josh Rosen -- It covers some of the issues like closure capture, 
 serialization, JVM communication that you'll need to handle for a new 
 language. 
 
 Thanks
 Shivaram
 
 On Tue, Jun 23, 2015 at 1:35 PM, Vasili I. Galchin vigalc...@gmail.com 
 mailto:vigalc...@gmail.com wrote:
 Hello,
 
   I want to add language support for another language(other than Scala, 
 Java et. al.). Where is documentation that explains to provide support for a 
 new language?
 
 Thank you,
 
 Vasili
 



how can I write a language wrapper?

2015-06-23 Thread Vasili I. Galchin
Hello,

  I want to add language support for another language(other than Scala,
Java et. al.). Where is documentation that explains to provide support for
a new language?

Thank you,

Vasili


Re: Python UDF performance at large scale

2015-06-23 Thread Justin Uang
// + punya

Thanks for your quick response!

I'm not sure that using an unbounded buffer is a good solution to the
locking problem. For example, in the situation where I had 500 columns, I
am in fact storing 499 extra columns on the java side, which might make me
OOM if I have to store many rows. In addition, if I am using an
AutoBatchedSerializer, the java side might have to write 1  16 == 65536
rows before python starts outputting elements, in which case, the Java side
has to buffer 65536 complete rows. In general it seems fragile to rely on
blocking behavior in the Python coprocess. By contrast, it's very easy to
verify the correctness and performance characteristics of the synchronous
blocking solution.


On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com wrote:

 Thanks for looking into it, I'd like the idea of having
 ForkingIterator. If we have unlimited buffer in it, then will not have
 the problem of deadlock, I think. The writing thread will be blocked
 by Python process, so there will be not much rows be buffered(still be
 a reason to OOM). At least, this approach is better than current one.

 Could you create a JIRA and sending out the PR?

 On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com
 wrote:
  BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
 but
  I have a proof-of-concept implementation that avoids caching the entire
  dataset.
 
  Hi,
 
  We have been running into performance problems using Python UDFs with
  DataFrames at large scale.
 
  From the implementation of BatchPythonEvaluation, it looks like the goal
 was
  to reuse the PythonRDD code. It caches the entire child RDD so that it
 can
  do two passes over the data. One to give to the PythonRDD, then one to
 join
  the python lambda results with the original row (which may have java
 objects
  that should be passed through).
 
  In addition, it caches all the columns, even the ones that don't need to
 be
  processed by the Python UDF. In the cases I was working with, I had a 500
  column table, and i wanted to use a python UDF for one column, and it
 ended
  up caching all 500 columns.
 
  I have a working solution over here that does it in one pass over the
 data,
  avoiding caching
  (
 https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
 ).
  With this patch, I go from a job that takes 20 minutes then OOMs, to a
 job
  that finishes completely in 3 minutes. It is indeed quite hacky and
 prone to
  deadlocks since there is buffering in many locations:
 
  - NEW: the ForkingIterator LinkedBlockingDeque
  - batching the rows before pickling them
  - os buffers on both sides
  - pyspark.serializers.BatchedSerializer
 
  We can avoid deadlock by being very disciplined. For example, we can have
  the ForkingIterator instead always do a check of whether the
  LinkedBlockingDeque is full and if so:
 
  Java
  - flush the java pickling buffer
  - send a flush command to the python process
  - os.flush the java side
 
  Python
  - flush BatchedSerializer
  - os.flush()
 
  I haven't added this yet. This is getting very complex however. Another
  model would just be to change the protocol between the java side and the
  worker to be a synchronous request/response. This has the disadvantage
 that
  the CPU isn't doing anything when the batch is being sent across, but it
 has
  the huge advantage of simplicity. In addition, I imagine that the actual
 IO
  between the processes isn't that slow, but rather the serialization of
 java
  objects into pickled bytes, and the deserialization/serialization +
 python
  loops on the python side. Another advantage is that we won't be taking
 more
  than 100% CPU since only one thread is doing CPU work at a time between
 the
  executor and the python interpreter.
 
  Any thoughts would be much appreciated =)
 
  Other improvements:
  - extract some code of the worker out of PythonRDD so that we can do
 a
  mapPartitions directly in BatchedPythonEvaluation without resorting to
 the
  hackery in ForkedRDD.compute(), which uses a cache to ensure that the
 other
  RDD can get a handle to the same iterator.
  - read elements and use a size estimator to create the BlockingQueue
 to
  make sure that we don't store too many things in memory when batching
  - patch Unpickler to not use StopException for control flow, which is
  slowing down the java side
 
 



[VOTE] Release Apache Spark 1.4.1

2015-06-23 Thread Patrick Wendell
Please vote on releasing the following candidate as Apache Spark version 1.4.1!

This release fixes a handful of known issues in Spark 1.4.0, listed here:
http://s.apache.org/spark-1.4.1

The tag to be voted on is v1.4.1-rc1 (commit 60e08e5):
https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=
60e08e50751fe3929156de956d62faea79f5b801

The release files, including signatures, digests, etc. can be found at:
http://people.apache.org/~pwendell/spark-releases/spark-1.4.1-rc1-bin/

Release artifacts are signed with the following key:
https://people.apache.org/keys/committer/pwendell.asc

The staging repository for this release can be found at:
[published as version: 1.4.1]
https://repository.apache.org/content/repositories/orgapachespark-1118/
[published as version: 1.4.1-rc1]
https://repository.apache.org/content/repositories/orgapachespark-1119/

The documentation corresponding to this release can be found at:
http://people.apache.org/~pwendell/spark-releases/spark-1.4.1-rc1-docs/

Please vote on releasing this package as Apache Spark 1.4.1!

The vote is open until Saturday, June 27, at 06:32 UTC and passes
if a majority of at least 3 +1 PMC votes are cast.

[ ] +1 Release this package as Apache Spark 1.4.1
[ ] -1 Do not release this package because ...

To learn more about Apache Spark, please see
http://spark.apache.org/

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



custom REST port from spark-defaults.cof

2015-06-23 Thread Niranda Perera
Hi,

is there a configuration setting to set a custom port number for the master
REST URL? can that be included in the spark-defaults.conf?

cheers
-- 
Niranda
@n1r44 https://twitter.com/N1R44
https://pythagoreanscript.wordpress.com/


[SparkSQL 1.4]Could not use concat with UDF in where clause

2015-06-23 Thread StanZhai
Hi all,

After upgraded the cluster from spark 1.3.1 to 1.4.0(rc4), I encountered the
following exception when use concat with UDF in where clause:

===Exception
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to
dataType on unresolved object, tree:
'concat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(date#1776),年)
at
org.apache.spark.sql.catalyst.analysis.UnresolvedFunction.dataType(unresolved.scala:82)
at
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5$$anonfun$applyOrElse$15.apply(HiveTypeCoercion.scala:299)
at
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5$$anonfun$applyOrElse$15.apply(HiveTypeCoercion.scala:299)
at
scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:80)
at scala.collection.immutable.List.exists(List.scala:84)
at
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5.applyOrElse(HiveTypeCoercion.scala:299)
at
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5.applyOrElse(HiveTypeCoercion.scala:298)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:75)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:85)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:94)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:64)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformAllExpressions$1.applyOrElse(QueryPlan.scala:136)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformAllExpressions$1.applyOrElse(QueryPlan.scala:135)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:242)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:227)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:242)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 

Calculating tuple count /input rate with time

2015-06-23 Thread anshu shukla
I am calculating input rate using the following logic.

And i think this foreachRDD is always running on driver (println are
seen on driver)

1- Is there any other way to do that in less cost .

2- Will this give me the correct count for rate  .


//code -

inputStream.foreachRDD(new FunctionJavaRDDString, Void() {
@Override
public Void call(JavaRDDString stringJavaRDD) throws Exception {
System.out.println(System.currentTimeMillis()+,spoutstringJavaRDD,
+ stringJavaRDD.count() );
return null;
}
});



-- 
Thanks  Regards,
Anshu Shukla


HyperLogLogUDT

2015-06-23 Thread Nick Pentreath
Hey Spark devs

I've been looking at DF UDFs and UDAFs. The approx distinct is using
hyperloglog,
but there is only an option to return the count as a Long.

It can be useful to be able to return and store the actual data structure
(ie serialized HLL). This effectively allows one to do aggregation /
rollups over columns while still preserving the ability to get distinct
counts.

For example, one can store daily aggregates of events, grouped by various
columns, while storing for each grouping the HLL of say unique users. So
you can get the uniques per day directly but could also very easily do
arbitrary aggregates (say monthly, annually) and still be able to get a
unique count for that period by merging the daily HLLS.

I did this a while back as a Hive UDAF (https://github.com/MLnick/hive-udf)
which returns a Struct field containing a cardinality field and a
binary field containing the serialized HLL.

I was wondering if there would be interest in something like this? I am not
so clear on how UDTs work with regards to SerDe - so could one adapt the
HyperLogLogUDT to be a Struct with the serialized HLL as a field as well as
count as a field? Then I assume this would automatically play nicely with
DataFrame I/O etc. The gotcha is one needs to then call
approx_count_field.count (or is there a concept of a default field for
a Struct?).

Also, being able to provide the bitsize parameter may be useful...

The same thinking would apply potentially to other approximate (and
mergeable) data structures like T-Digest and maybe CMS.

Nick


OK to add committers active on JIRA to JIRA admin role?

2015-06-23 Thread Sean Owen
There are some committers who are active on JIRA and sometimes need to
do things that require JIRA admin access -- in particular thinking of
adding a new person as Contributor in order to assign them a JIRA.
We can't change what roles can do what (think that INFRA ticket is
dead) but can add to the Admin role. Would anyone object to making a
few more committers JIRA Admins for this purpose?

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Python UDF performance at large scale

2015-06-23 Thread Justin Uang
BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
but I have a proof-of-concept implementation that avoids caching the entire
dataset.

Hi,

We have been running into performance problems using Python UDFs with
DataFrames at large scale.

From the implementation of BatchPythonEvaluation, it looks like the goal
was to reuse the PythonRDD code. It caches the entire child RDD so that it
can do two passes over the data. One to give to the PythonRDD, then one to
join the python lambda results with the original row (which may have java
objects that should be passed through).

In addition, it caches all the columns, even the ones that don't need to be
processed by the Python UDF. In the cases I was working with, I had a 500
column table, and i wanted to use a python UDF for one column, and it ended
up caching all 500 columns.

I have a working solution over here that does it in one pass over the data,
avoiding caching (
https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
With this patch, I go from a job that takes 20 minutes then OOMs, to a job
that finishes completely in 3 minutes. It is indeed quite hacky and prone
to deadlocks since there is buffering in many locations:

- NEW: the ForkingIterator LinkedBlockingDeque
- batching the rows before pickling them
- os buffers on both sides
- pyspark.serializers.BatchedSerializer

We can avoid deadlock by being very disciplined. For example, we can have
the ForkingIterator instead always do a check of whether the
LinkedBlockingDeque is full and if so:

Java
- flush the java pickling buffer
- send a flush command to the python process
- os.flush the java side

Python
- flush BatchedSerializer
- os.flush()

I haven't added this yet. This is getting very complex however. Another
model would just be to change the protocol between the java side and the
worker to be a synchronous request/response. This has the disadvantage that
the CPU isn't doing anything when the batch is being sent across, but it
has the huge advantage of simplicity. In addition, I imagine that the
actual IO between the processes isn't that slow, but rather the
serialization of java objects into pickled bytes, and the
deserialization/serialization + python loops on the python side. Another
advantage is that we won't be taking more than 100% CPU since only one
thread is doing CPU work at a time between the executor and the python
interpreter.

Any thoughts would be much appreciated =)

Other improvements:
- extract some code of the worker out of PythonRDD so that we can do a
mapPartitions directly in BatchedPythonEvaluation without resorting to the
hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
RDD can get a handle to the same iterator.
- read elements and use a size estimator to create the BlockingQueue to
make sure that we don't store too many things in memory when batching
- patch Unpickler to not use StopException for control flow, which is
slowing down the java side


Re: Python UDF performance at large scale

2015-06-23 Thread Davies Liu
Thanks for looking into it, I'd like the idea of having
ForkingIterator. If we have unlimited buffer in it, then will not have
the problem of deadlock, I think. The writing thread will be blocked
by Python process, so there will be not much rows be buffered(still be
a reason to OOM). At least, this approach is better than current one.

Could you create a JIRA and sending out the PR?

On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com wrote:
 BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but
 I have a proof-of-concept implementation that avoids caching the entire
 dataset.

 Hi,

 We have been running into performance problems using Python UDFs with
 DataFrames at large scale.

 From the implementation of BatchPythonEvaluation, it looks like the goal was
 to reuse the PythonRDD code. It caches the entire child RDD so that it can
 do two passes over the data. One to give to the PythonRDD, then one to join
 the python lambda results with the original row (which may have java objects
 that should be passed through).

 In addition, it caches all the columns, even the ones that don't need to be
 processed by the Python UDF. In the cases I was working with, I had a 500
 column table, and i wanted to use a python UDF for one column, and it ended
 up caching all 500 columns.

 I have a working solution over here that does it in one pass over the data,
 avoiding caching
 (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
 With this patch, I go from a job that takes 20 minutes then OOMs, to a job
 that finishes completely in 3 minutes. It is indeed quite hacky and prone to
 deadlocks since there is buffering in many locations:

 - NEW: the ForkingIterator LinkedBlockingDeque
 - batching the rows before pickling them
 - os buffers on both sides
 - pyspark.serializers.BatchedSerializer

 We can avoid deadlock by being very disciplined. For example, we can have
 the ForkingIterator instead always do a check of whether the
 LinkedBlockingDeque is full and if so:

 Java
 - flush the java pickling buffer
 - send a flush command to the python process
 - os.flush the java side

 Python
 - flush BatchedSerializer
 - os.flush()

 I haven't added this yet. This is getting very complex however. Another
 model would just be to change the protocol between the java side and the
 worker to be a synchronous request/response. This has the disadvantage that
 the CPU isn't doing anything when the batch is being sent across, but it has
 the huge advantage of simplicity. In addition, I imagine that the actual IO
 between the processes isn't that slow, but rather the serialization of java
 objects into pickled bytes, and the deserialization/serialization + python
 loops on the python side. Another advantage is that we won't be taking more
 than 100% CPU since only one thread is doing CPU work at a time between the
 executor and the python interpreter.

 Any thoughts would be much appreciated =)

 Other improvements:
 - extract some code of the worker out of PythonRDD so that we can do a
 mapPartitions directly in BatchedPythonEvaluation without resorting to the
 hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
 RDD can get a handle to the same iterator.
 - read elements and use a size estimator to create the BlockingQueue to
 make sure that we don't store too many things in memory when batching
 - patch Unpickler to not use StopException for control flow, which is
 slowing down the java side



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org