Purpose of broadcast timeout

2019-01-30 Thread Justin Uang
Hi all,

We have noticed a lot of broadcast timeouts on our pipelines, and from some
inspection, it seems that they happen when I have two threads trying to
save two different DataFrames. We use the FIFO scheduler, so if I launch a
job that needs all the executors, the second DataFrame's collect on the
broadcast side is guaranteed to take longer than 5 minutes, and will throw.

My question is why do we have a timeout on a collect when broadcasting? It
seems silly that we have a small default timeout on something that is
influenced by contention on the cluster. We are basically saying that all
broadcast jobs need to finish in 5 minutes, regardless of our scheduling
policy on the cluster.

I'm curious about the original intention of the broadcast timeout. Perhaps
is the broadcast timeout really meant to be a timeout on
sparkContext.broadcast, instead of the child.executeCollectIterator()? In
that case, would it make sense to move the timeout to wrap only
sparkContext.broadcast?

Best,

Justin


Re: Using UDFs in Java without registration

2017-07-26 Thread Justin Uang
Would like to bring this back for consideration again. I'm open to adding
types for all the parameters, but it does seem onerous, and in the case of
Python, we don't do that. Do you feel strongly about adding them?

On Sat, May 30, 2015 at 8:04 PM Reynold Xin  wrote:

> We added all the typetags for arguments but haven't got around to use them
> yet. I think it'd make sense to have them and do the auto cast, but we can
> have rules in analysis to forbid certain casts (e.g. don't auto cast double
> to int).
>
>
> On Sat, May 30, 2015 at 7:12 AM, Justin Uang 
> wrote:
>
>> The idea of asking for both the argument and return class is interesting.
>> I don't think we do that for the scala APIs currently, right? In
>> functions.scala, we only use the TypeTag for RT.
>>
>>   def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]):
>> UserDefinedFunction = {
>> UserDefinedFunction(f,
>> ScalaReflection.schemaFor(typeTag[RT]).dataType)
>>   }
>>
>> There would only be a small subset of conversions that would make sense
>> implicitly (e.g. int to double, the typical conversions in programming
>> languages), but something like (double => int) might be dangerous and
>> (timestamp => double) wouldn't really make sense. Perhaps it's better to be
>> explicit about casts?
>>
>> If we don't care about declaring the types of the arguments, perhaps we
>> can have all of the java UDF interfaces (UDF1, UDF2, etc) extend a generic
>> interface called UDF, then have
>>
>> def define(f: UDF, returnType: Class[_])
>>
>> to simplify the APIs.
>>
>>
>> On Sat, May 30, 2015 at 3:43 AM Reynold Xin  wrote:
>>
>>> I think you are right that there is no way to call Java UDF without
>>> registration right now. Adding another 20 methods to functions would be
>>> scary. Maybe the best way is to have a companion object
>>> for UserDefinedFunction, and define UDF there?
>>>
>>> e.g.
>>>
>>> object UserDefinedFunction {
>>>
>>>   def define(f: org.apache.spark.api.java.function.Function0,
>>> returnType: Class[_]): UserDefinedFunction
>>>
>>>   // ... define a few more - maybe up to 5 arguments?
>>> }
>>>
>>> Ideally, we should ask for both argument class and return class, so we
>>> can do the proper type conversion (e.g. if the UDF expects a string, but
>>> the input expression is an int, Catalyst can automatically add a cast).
>>> However, we haven't implemented those in UserDefinedFunction yet.
>>>
>>>
>>>
>>>
>>> On Fri, May 29, 2015 at 12:54 PM, Justin Uang 
>>> wrote:
>>>
>>>> I would like to define a UDF in Java via a closure and then use it
>>>> without registration. In Scala, I believe there are two ways to do this:
>>>>
>>>> myUdf = functions.udf({ _ + 5})
>>>> myDf.select(myUdf(myDf("age")))
>>>>
>>>> or
>>>>
>>>> myDf.select(functions.callUDF({_ + 5}, DataTypes.IntegerType,
>>>> myDf("age")))
>>>>
>>>> However, both of these don't work for Java UDF. The first one requires
>>>> TypeTags. For the second one, I was able to hack it by creating a scala
>>>> AbstractFunction1 and using callUDF, which requires declaring the catalyst
>>>> DataType instead of using TypeTags. However, it was still nasty because I
>>>> had to return a scala map instead of a java map.
>>>>
>>>> Is there first class support for creating
>>>> a org.apache.spark.sql.UserDefinedFunction that works with
>>>> the org.apache.spark.sql.api.java.UDF1? I'm fine with having to
>>>> declare the catalyst type when creating it.
>>>>
>>>> If it doesn't exist, I would be happy to work on it =)
>>>>
>>>> Justin
>>>>
>>>
>>>
>


Making BatchPythonEvaluation actually Batch

2016-01-31 Thread Justin Uang
Hey guys,

BLUF: sorry for the length of this email, trying to figure out how to batch
Python UDF executions, and since this is my first time messing with
catalyst, would like any feedback

My team is starting to use PySpark UDFs quite heavily, and performance is a
huge blocker. The extra roundtrip serialization from Java to Python is not
a huge concern if we only incur it ~once per column for most workflows,
since it'll be in the same order of magnitude as reading files from disk.
However, right now each Python UDFs lead to a single roundtrip. There is
definitely a lot we can do regarding this:

(all the prototyping code is here:
https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc
)

1. We can't chain Python UDFs.

df.select(python_times_2(python_times_2("col1")))

throws an exception saying that the inner expression isn't evaluable. The
workaround is to do


df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp"))

This can be solved in ExtractPythonUDFs by always extracting the inner most
Python UDF first.

 // Pick the UDF we are going to evaluate (TODO: Support evaluating
multiple UDFs at a time)
 // If there is more than one, we will add another evaluation
operator in a subsequent pass.
-udfs.find(_.resolved) match {
+udfs.find { udf =>
+  udf.resolved && udf.children.map { child: Expression =>
+child.find { // really hacky way to find if a child of a udf
has the PythonUDF node
+  case p: PythonUDF => true
+  case _ => false
+}.isEmpty
+  }.reduce((x, y) => x && y)
+} match {
   case Some(udf) =>
 var evaluation: EvaluatePython = null

2. If we have a Python UDF applied to many different columns, where they
don’t depend on each other, we can optimize them by collapsing them down
into a single python worker. Although we have to serialize and send the
same amount of data to the python interpreter, in the case where I am
applying the same function to 20 columns, the overhead/context_switches of
having 20 interpreters run at the same time causes huge performance hits. I
have confirmed this by manually taking the 20 columns, converting them to a
struct, and then writing a UDF that processes the struct at the same time,
and the speed difference is 2x. My approach to adding this to catalyst is
basically to write an optimizer rule called CombinePython which joins
adjacent EvaluatePython nodes that don’t depend on each other’s variables,
and then having BatchPythonEvaluation run multiple lambdas at once. I would
also like to be able to handle the case
df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”),
python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a
PushDownPythonEvaluation optimizer that will push the optimization through
a select/project, so that the CombinePython rule can join the two.

3. I would like CombinePython to be able to handle UDFs that chain off of
each other.

df.select(python_times_2(python_times_2(“col1”)))

I haven’t prototyped this yet, since it’s a lot more complex. The way I’m
thinking about this is to still have a rule called CombinePython, except
that the BatchPythonEvaluation will need to be smart enough to build up the
dag of dependencies, and then feed that information to the python
interpreter, so it can compute things in the right order, and reuse the
in-memory objects that it has already computed. Does this seem right?
Should the code mainly be in BatchPythonEvaluation? In addition, we will
need to change up the protocol between the java and python sides to support
sending this information. What is acceptable?

Any help would be much appreciated! Especially w.r.t where to the design
choices such that the PR that has a chance of being accepted.

Justin


Spark SQL: Avoid shuffles when data is already partitioned on disk

2016-01-21 Thread Justin Uang
Hi,

If I had a df and I wrote it out via partitionBy("id"), presumably, when I
load in the df and do a groupBy("id"), a shuffle shouldn't be necessary
right? Effectively, we can load in the dataframe with a hash partitioner
already set, since each task can simply read all the folders where
id= where hash() % reducer_count == reducer_id. Is this an
optimization that is on the radar? This will be a huge boon in terms of
reducing the number of shuffles necessary if we're always joining on the
same columns.

Best,

Justin


Re: Returning numpy types from udfs

2015-12-05 Thread Justin Uang
Filed here:

https://issues.apache.org/jira/browse/SPARK-12157

On Sat, Dec 5, 2015 at 3:08 PM Reynold Xin  wrote:

> Not aware of any jira ticket, but it does sound like a great idea.
>
>
> On Sat, Dec 5, 2015 at 11:03 PM, Justin Uang 
> wrote:
>
>> Hi,
>>
>> I have fallen into the trap of returning numpy types from udfs, such as
>> np.float64 and np.int. It's hard to find the issue because they behave
>> pretty much as regular pure Python floats and doubles, so can we make
>> PYSPARK automatically translate them?
>>
>> If so, I'll create a Jira ticket.
>>
>> Justin
>>
>
>


Returning numpy types from udfs

2015-12-05 Thread Justin Uang
Hi,

I have fallen into the trap of returning numpy types from udfs, such as
np.float64 and np.int. It's hard to find the issue because they behave
pretty much as regular pure Python floats and doubles, so can we make
PYSPARK automatically translate them?

If so, I'll create a Jira ticket.

Justin


Subtract implementation using broadcast

2015-11-27 Thread Justin Uang
Hi,

I have seen massive gains with the broadcast hint for joins with
DataFrames, and I was wondering if we have thought about allowing the
broadcast hint for the implementation of subtract and intersect.

Right now, when I try it, it says that there is no plan for the broadcast
hint.

Justin


Info about Dataset

2015-11-03 Thread Justin Uang
Hi,

I was looking through some of the PRs slated for 1.6.0 and I noted
something called a Dataset, which looks like a new concept based off of the
scaladoc for the class. Can anyone point me to some references/design_docs
regarding the choice to introduce the new concept? I presume it is probably
something to do with performance optimizations?

Thanks!

Justin


Re: Off-heap storage and dynamic allocation

2015-11-03 Thread Justin Uang
Cool, thanks for the dev insight into what parts of the codebase are
worthwhile, and which are not =)

On Tue, Nov 3, 2015 at 10:25 PM Reynold Xin  wrote:

> It is quite a bit of work. Again, I think going through the file system
> API is more ideal in the long run. In the long run, I don't even think the
> current offheap API makes much sense, and we should consider just removing
> it to simplify things.
>
> On Tue, Nov 3, 2015 at 1:20 PM, Justin Uang  wrote:
>
>> Alright, we'll just stick with normal caching then.
>>
>> Just for future reference, how much work would it be to get it to retain
>> the partitions in tachyon. This is especially helpful in a multitenant
>> situation, where many users each have their own persistent spark contexts,
>> but where the notebooks can be idle for long periods of time while holding
>> onto cached rdds.
>>
>> On Tue, Nov 3, 2015 at 10:15 PM Reynold Xin  wrote:
>>
>>> It is lost unfortunately (although can be recomputed automatically).
>>>
>>>
>>> On Tue, Nov 3, 2015 at 1:13 PM, Justin Uang 
>>> wrote:
>>>
>>>> Thanks for your response. I was worried about #3, vs being able to use
>>>> the objects directly. #2 seems to be the dealbreaker for my use case right?
>>>> Even if it I am using tachyon for caching, if an executor is lost, then
>>>> that partition is lost for the purposes of spark?
>>>>
>>>> On Tue, Nov 3, 2015 at 5:53 PM Reynold Xin  wrote:
>>>>
>>>>> I don't think there is any special handling w.r.t. Tachyon vs in-heap
>>>>> caching. As a matter of fact, I think the current offheap caching
>>>>> implementation is pretty bad, because:
>>>>>
>>>>> 1. There is no namespace sharing in offheap mode
>>>>> 2. Similar to 1, you cannot recover the offheap memory once Spark
>>>>> driver or executor crashes
>>>>> 3. It requires expensive serialization to go offheap
>>>>>
>>>>> It would've been simpler to just treat Tachyon as a normal file
>>>>> system, and use it that way to at least satisfy 1 and 2, and also
>>>>> substantially simplify the internals.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Nov 3, 2015 at 7:59 AM, Justin Uang 
>>>>> wrote:
>>>>>
>>>>>> Yup, but I'm wondering what happens when an executor does get
>>>>>> removed, but when we're using tachyon. Will the cached data still be
>>>>>> available, since we're using off-heap storage, so the data isn't stored 
>>>>>> in
>>>>>> the executor?
>>>>>>
>>>>>> On Tue, Nov 3, 2015 at 4:57 PM Ryan Williams <
>>>>>> ryan.blake.willi...@gmail.com> wrote:
>>>>>>
>>>>>>> fwiw, I think that having cached RDD partitions prevents executors
>>>>>>> from being removed under dynamic allocation by default; see
>>>>>>> SPARK-8958 <https://issues.apache.org/jira/browse/SPARK-8958>. The
>>>>>>> "spark.dynamicAllocation.cachedExecutorIdleTimeout" config
>>>>>>> <http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation>
>>>>>>> controls this.
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 12:14 PM Justin Uang 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey guys,
>>>>>>>>
>>>>>>>> According to the docs for 1.5.1, when an executor is removed for
>>>>>>>> dynamic allocation, the cached data is gone. If I use off-heap storage 
>>>>>>>> like
>>>>>>>> tachyon, conceptually there isn't this issue anymore, but is the cached
>>>>>>>> data still available in practice? This would be great because then we 
>>>>>>>> would
>>>>>>>> be able to set spark.dynamicAllocation.cachedExecutorIdleTimeout to be
>>>>>>>> quite small.
>>>>>>>>
>>>>>>>> ==
>>>>>>>> In addition to writing shuffle files, executors also cache data
>>>>>>>> either on disk or in memory. When an executor is removed, however, all
>>>>>>>> cached data will no longer be accessible. There is currently not yet a
>>>>>>>> solution for this in Spark 1.2. In future releases, the cached data 
>>>>>>>> may be
>>>>>>>> preserved through an off-heap storage similar in spirit to how shuffle
>>>>>>>> files are preserved through the external shuffle service.
>>>>>>>> ==
>>>>>>>>
>>>>>>>
>>>>>
>>>
>


Re: Off-heap storage and dynamic allocation

2015-11-03 Thread Justin Uang
Alright, we'll just stick with normal caching then.

Just for future reference, how much work would it be to get it to retain
the partitions in tachyon. This is especially helpful in a multitenant
situation, where many users each have their own persistent spark contexts,
but where the notebooks can be idle for long periods of time while holding
onto cached rdds.

On Tue, Nov 3, 2015 at 10:15 PM Reynold Xin  wrote:

> It is lost unfortunately (although can be recomputed automatically).
>
>
> On Tue, Nov 3, 2015 at 1:13 PM, Justin Uang  wrote:
>
>> Thanks for your response. I was worried about #3, vs being able to use
>> the objects directly. #2 seems to be the dealbreaker for my use case right?
>> Even if it I am using tachyon for caching, if an executor is lost, then
>> that partition is lost for the purposes of spark?
>>
>> On Tue, Nov 3, 2015 at 5:53 PM Reynold Xin  wrote:
>>
>>> I don't think there is any special handling w.r.t. Tachyon vs in-heap
>>> caching. As a matter of fact, I think the current offheap caching
>>> implementation is pretty bad, because:
>>>
>>> 1. There is no namespace sharing in offheap mode
>>> 2. Similar to 1, you cannot recover the offheap memory once Spark driver
>>> or executor crashes
>>> 3. It requires expensive serialization to go offheap
>>>
>>> It would've been simpler to just treat Tachyon as a normal file system,
>>> and use it that way to at least satisfy 1 and 2, and also substantially
>>> simplify the internals.
>>>
>>>
>>>
>>>
>>> On Tue, Nov 3, 2015 at 7:59 AM, Justin Uang 
>>> wrote:
>>>
>>>> Yup, but I'm wondering what happens when an executor does get removed,
>>>> but when we're using tachyon. Will the cached data still be available,
>>>> since we're using off-heap storage, so the data isn't stored in the
>>>> executor?
>>>>
>>>> On Tue, Nov 3, 2015 at 4:57 PM Ryan Williams <
>>>> ryan.blake.willi...@gmail.com> wrote:
>>>>
>>>>> fwiw, I think that having cached RDD partitions prevents executors
>>>>> from being removed under dynamic allocation by default; see SPARK-8958
>>>>> <https://issues.apache.org/jira/browse/SPARK-8958>. The
>>>>> "spark.dynamicAllocation.cachedExecutorIdleTimeout" config
>>>>> <http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation>
>>>>> controls this.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 12:14 PM Justin Uang 
>>>>> wrote:
>>>>>
>>>>>> Hey guys,
>>>>>>
>>>>>> According to the docs for 1.5.1, when an executor is removed for
>>>>>> dynamic allocation, the cached data is gone. If I use off-heap storage 
>>>>>> like
>>>>>> tachyon, conceptually there isn't this issue anymore, but is the cached
>>>>>> data still available in practice? This would be great because then we 
>>>>>> would
>>>>>> be able to set spark.dynamicAllocation.cachedExecutorIdleTimeout to be
>>>>>> quite small.
>>>>>>
>>>>>> ==
>>>>>> In addition to writing shuffle files, executors also cache data
>>>>>> either on disk or in memory. When an executor is removed, however, all
>>>>>> cached data will no longer be accessible. There is currently not yet a
>>>>>> solution for this in Spark 1.2. In future releases, the cached data may 
>>>>>> be
>>>>>> preserved through an off-heap storage similar in spirit to how shuffle
>>>>>> files are preserved through the external shuffle service.
>>>>>> ==
>>>>>>
>>>>>
>>>
>


Re: Pickle Spark DataFrame

2015-11-03 Thread Justin Uang
Is the Manager a python multiprocessing manager? Why are you using
parallelism on python when theoretically most of the heavy lifting is done
via spark?

On Wed, Oct 28, 2015 at 4:27 PM agg212  wrote:

> I would just like to be able to put a Spark DataFrame in a manager.dict()
> and
> be able to get it out (manager.dict() calls pickle on the object being
> stored).  Ideally, I would just like to store a pointer to the DataFrame
> object so that it remains distributed within Spark (i.e., not materialize
> and then store).  Here is an example:
>
> data = sparkContext.jsonFile(data_file) #load file
> cache = Manager.dict() #thread-safe container
> cache['id'] = data #store reference to data, not materialized result
> new_data = cache['id'] #get reference to distributed spark dataframe
> new_data.show()
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Pickle-Spark-DataFrame-tp14803p14825.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: Off-heap storage and dynamic allocation

2015-11-03 Thread Justin Uang
Thanks for your response. I was worried about #3, vs being able to use the
objects directly. #2 seems to be the dealbreaker for my use case right?
Even if it I am using tachyon for caching, if an executor is lost, then
that partition is lost for the purposes of spark?

On Tue, Nov 3, 2015 at 5:53 PM Reynold Xin  wrote:

> I don't think there is any special handling w.r.t. Tachyon vs in-heap
> caching. As a matter of fact, I think the current offheap caching
> implementation is pretty bad, because:
>
> 1. There is no namespace sharing in offheap mode
> 2. Similar to 1, you cannot recover the offheap memory once Spark driver
> or executor crashes
> 3. It requires expensive serialization to go offheap
>
> It would've been simpler to just treat Tachyon as a normal file system,
> and use it that way to at least satisfy 1 and 2, and also substantially
> simplify the internals.
>
>
>
>
> On Tue, Nov 3, 2015 at 7:59 AM, Justin Uang  wrote:
>
>> Yup, but I'm wondering what happens when an executor does get removed,
>> but when we're using tachyon. Will the cached data still be available,
>> since we're using off-heap storage, so the data isn't stored in the
>> executor?
>>
>> On Tue, Nov 3, 2015 at 4:57 PM Ryan Williams <
>> ryan.blake.willi...@gmail.com> wrote:
>>
>>> fwiw, I think that having cached RDD partitions prevents executors from
>>> being removed under dynamic allocation by default; see SPARK-8958
>>> <https://issues.apache.org/jira/browse/SPARK-8958>. The
>>> "spark.dynamicAllocation.cachedExecutorIdleTimeout" config
>>> <http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation>
>>> controls this.
>>>
>>> On Fri, Oct 30, 2015 at 12:14 PM Justin Uang 
>>> wrote:
>>>
>>>> Hey guys,
>>>>
>>>> According to the docs for 1.5.1, when an executor is removed for
>>>> dynamic allocation, the cached data is gone. If I use off-heap storage like
>>>> tachyon, conceptually there isn't this issue anymore, but is the cached
>>>> data still available in practice? This would be great because then we would
>>>> be able to set spark.dynamicAllocation.cachedExecutorIdleTimeout to be
>>>> quite small.
>>>>
>>>> ==
>>>> In addition to writing shuffle files, executors also cache data either
>>>> on disk or in memory. When an executor is removed, however, all cached data
>>>> will no longer be accessible. There is currently not yet a solution for
>>>> this in Spark 1.2. In future releases, the cached data may be preserved
>>>> through an off-heap storage similar in spirit to how shuffle files are
>>>> preserved through the external shuffle service.
>>>> ==
>>>>
>>>
>


Re: Off-heap storage and dynamic allocation

2015-11-03 Thread Justin Uang
Yup, but I'm wondering what happens when an executor does get removed, but
when we're using tachyon. Will the cached data still be available, since
we're using off-heap storage, so the data isn't stored in the executor?

On Tue, Nov 3, 2015 at 4:57 PM Ryan Williams 
wrote:

> fwiw, I think that having cached RDD partitions prevents executors from
> being removed under dynamic allocation by default; see SPARK-8958
> <https://issues.apache.org/jira/browse/SPARK-8958>. The
> "spark.dynamicAllocation.cachedExecutorIdleTimeout" config
> <http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation>
> controls this.
>
> On Fri, Oct 30, 2015 at 12:14 PM Justin Uang 
> wrote:
>
>> Hey guys,
>>
>> According to the docs for 1.5.1, when an executor is removed for dynamic
>> allocation, the cached data is gone. If I use off-heap storage like
>> tachyon, conceptually there isn't this issue anymore, but is the cached
>> data still available in practice? This would be great because then we would
>> be able to set spark.dynamicAllocation.cachedExecutorIdleTimeout to be
>> quite small.
>>
>> ==
>> In addition to writing shuffle files, executors also cache data either on
>> disk or in memory. When an executor is removed, however, all cached data
>> will no longer be accessible. There is currently not yet a solution for
>> this in Spark 1.2. In future releases, the cached data may be preserved
>> through an off-heap storage similar in spirit to how shuffle files are
>> preserved through the external shuffle service.
>> ==
>>
>


Off-heap storage and dynamic allocation

2015-10-30 Thread Justin Uang
Hey guys,

According to the docs for 1.5.1, when an executor is removed for dynamic
allocation, the cached data is gone. If I use off-heap storage like
tachyon, conceptually there isn't this issue anymore, but is the cached
data still available in practice? This would be great because then we would
be able to set spark.dynamicAllocation.cachedExecutorIdleTimeout to be
quite small.

==
In addition to writing shuffle files, executors also cache data either on
disk or in memory. When an executor is removed, however, all cached data
will no longer be accessible. There is currently not yet a solution for
this in Spark 1.2. In future releases, the cached data may be preserved
through an off-heap storage similar in spirit to how shuffle files are
preserved through the external shuffle service.
==


Re: Python UDAFs

2015-10-02 Thread Justin Uang
Cool, filed here: https://issues.apache.org/jira/browse/SPARK-10915

On Fri, Oct 2, 2015 at 3:21 PM Reynold Xin  wrote:

> No, not yet.
>
>
> On Fri, Oct 2, 2015 at 12:20 PM, Justin Uang 
> wrote:
>
>> Hi,
>>
>> Is there a Python API for UDAFs?
>>
>> Thanks!
>>
>> Justin
>>
>
>


Python UDAFs

2015-10-02 Thread Justin Uang
Hi,

Is there a Python API for UDAFs?

Thanks!

Justin


Fast Iteration while developing

2015-09-07 Thread Justin Uang
Hi,

What is the normal workflow for the core devs?

- Do we need to build the assembly jar to be able to run it from the spark
repo?
- Do you use sbt or maven to do the build?
- Is zinc only usuable for maven?

I'm asking because the current process I have right now is to do sbt build,
which means I'm stuck with about a 3-5 minute iteration cycle.

Thanks!

Justin


Re: PySpark on PyPi

2015-08-20 Thread Justin Uang
One other question: Do we have consensus on publishing the pip-installable
source distribution to PyPI? If so, is that something that the maintainers
need to add to the process that they use to publish releases?

On Thu, Aug 20, 2015 at 5:44 PM Justin Uang  wrote:

> I would prefer to just do it without the jar first as well. My hunch is
> that to run spark the way it is intended, we need the wrapper scripts, like
> spark-submit. Does anyone know authoritatively if that is the case?
>
> On Thu, Aug 20, 2015 at 4:54 PM Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> +1
>> But just to improve the error logging,
>> would it be possible to add some warn logging in pyspark when the
>> SPARK_HOME env variable is pointing to a Spark distribution with a
>> different version from the pyspark package ?
>>
>> Regards,
>>
>> Olivier.
>>
>> 2015-08-20 22:43 GMT+02:00 Brian Granger :
>>
>>> I would start with just the plain python package without the JAR and
>>> then see if it makes sense to add the JAR over time.
>>>
>>> On Thu, Aug 20, 2015 at 12:27 PM, Auberon Lopez 
>>> wrote:
>>> > Hi all,
>>> >
>>> > I wanted to bubble up a conversation from the PR to this discussion to
>>> see
>>> > if there is support the idea of including a Spark assembly JAR in a
>>> PyPI
>>> > release of pyspark. @holdenk recommended this as she already does so
>>> in the
>>> > Sparkling Pandas package. Is this something people are interesting in
>>> > pursuing?
>>> >
>>> > -Auberon
>>> >
>>> > On Thu, Aug 20, 2015 at 10:03 AM, Brian Granger 
>>> wrote:
>>> >>
>>> >> Auberon, can you also post this to the Jupyter Google Group?
>>> >>
>>> >> On Wed, Aug 19, 2015 at 12:23 PM, Auberon Lopez <
>>> auberon.lo...@gmail.com>
>>> >> wrote:
>>> >> > Hi all,
>>> >> >
>>> >> > I've created an updated PR for this based off of the previous work
>>> of
>>> >> > @prabinb:
>>> >> > https://github.com/apache/spark/pull/8318
>>> >> >
>>> >> > I am not very familiar with python packaging; feedback is
>>> appreciated.
>>> >> >
>>> >> > -Auberon
>>> >> >
>>> >> > On Mon, Aug 10, 2015 at 12:45 PM, MinRK 
>>> wrote:
>>> >> >>
>>> >> >>
>>> >> >> On Mon, Aug 10, 2015 at 12:28 PM, Matt Goodman >> >
>>> >> >> wrote:
>>> >> >>>
>>> >> >>> I would tentatively suggest also conda packaging.
>>> >> >>
>>> >> >>
>>> >> >> A conda package has the advantage that it can be set up without
>>> >> >> 'installing' the pyspark files, while the PyPI packaging is still
>>> being
>>> >> >> worked out. It can just add a pyspark.pth file pointing to pyspark,
>>> >> >> py4j
>>> >> >> locations. But I think it's a really good idea to package with
>>> conda.
>>> >> >>
>>> >> >> -MinRK
>>> >> >>
>>> >> >>>
>>> >> >>>
>>> >> >>> http://conda.pydata.org/docs/
>>> >> >>>
>>> >> >>> --Matthew Goodman
>>> >> >>>
>>> >> >>> =====
>>> >> >>> Check Out My Website: http://craneium.net
>>> >> >>> Find me on LinkedIn: http://tinyurl.com/d6wlch
>>> >> >>>
>>> >> >>> On Mon, Aug 10, 2015 at 11:23 AM, Davies Liu <
>>> dav...@databricks.com>
>>> >> >>> wrote:
>>> >> >>>>
>>> >> >>>> I think so, any contributions on this are welcome.
>>> >> >>>>
>>> >> >>>> On Mon, Aug 10, 2015 at 11:03 AM, Brian Granger <
>>> elliso...@gmail.com>
>>> >> >>>> wrote:
>>> >> >>>> > Sorry, trying to follow the context here. Does it look like
>>> there
>>> >> >>>> > is
>>> >> >>>> > support for the idea of creating a setup.py file and pypi
>>> p

Re: PySpark on PyPi

2015-08-20 Thread Justin Uang
I would prefer to just do it without the jar first as well. My hunch is
that to run spark the way it is intended, we need the wrapper scripts, like
spark-submit. Does anyone know authoritatively if that is the case?

On Thu, Aug 20, 2015 at 4:54 PM Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> +1
> But just to improve the error logging,
> would it be possible to add some warn logging in pyspark when the
> SPARK_HOME env variable is pointing to a Spark distribution with a
> different version from the pyspark package ?
>
> Regards,
>
> Olivier.
>
> 2015-08-20 22:43 GMT+02:00 Brian Granger :
>
>> I would start with just the plain python package without the JAR and
>> then see if it makes sense to add the JAR over time.
>>
>> On Thu, Aug 20, 2015 at 12:27 PM, Auberon Lopez 
>> wrote:
>> > Hi all,
>> >
>> > I wanted to bubble up a conversation from the PR to this discussion to
>> see
>> > if there is support the idea of including a Spark assembly JAR in a PyPI
>> > release of pyspark. @holdenk recommended this as she already does so in
>> the
>> > Sparkling Pandas package. Is this something people are interesting in
>> > pursuing?
>> >
>> > -Auberon
>> >
>> > On Thu, Aug 20, 2015 at 10:03 AM, Brian Granger 
>> wrote:
>> >>
>> >> Auberon, can you also post this to the Jupyter Google Group?
>> >>
>> >> On Wed, Aug 19, 2015 at 12:23 PM, Auberon Lopez <
>> auberon.lo...@gmail.com>
>> >> wrote:
>> >> > Hi all,
>> >> >
>> >> > I've created an updated PR for this based off of the previous work of
>> >> > @prabinb:
>> >> > https://github.com/apache/spark/pull/8318
>> >> >
>> >> > I am not very familiar with python packaging; feedback is
>> appreciated.
>> >> >
>> >> > -Auberon
>> >> >
>> >> > On Mon, Aug 10, 2015 at 12:45 PM, MinRK 
>> wrote:
>> >> >>
>> >> >>
>> >> >> On Mon, Aug 10, 2015 at 12:28 PM, Matt Goodman 
>> >> >> wrote:
>> >> >>>
>> >> >>> I would tentatively suggest also conda packaging.
>> >> >>
>> >> >>
>> >> >> A conda package has the advantage that it can be set up without
>> >> >> 'installing' the pyspark files, while the PyPI packaging is still
>> being
>> >> >> worked out. It can just add a pyspark.pth file pointing to pyspark,
>> >> >> py4j
>> >> >> locations. But I think it's a really good idea to package with
>> conda.
>> >> >>
>> >> >> -MinRK
>> >> >>
>> >> >>>
>> >> >>>
>> >> >>> http://conda.pydata.org/docs/
>> >> >>>
>> >> >>> --Matthew Goodman
>> >> >>>
>> >> >>> =
>> >> >>> Check Out My Website: http://craneium.net
>> >> >>> Find me on LinkedIn: http://tinyurl.com/d6wlch
>> >> >>>
>> >> >>> On Mon, Aug 10, 2015 at 11:23 AM, Davies Liu <
>> dav...@databricks.com>
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> I think so, any contributions on this are welcome.
>> >> >>>>
>> >> >>>> On Mon, Aug 10, 2015 at 11:03 AM, Brian Granger <
>> elliso...@gmail.com>
>> >> >>>> wrote:
>> >> >>>> > Sorry, trying to follow the context here. Does it look like
>> there
>> >> >>>> > is
>> >> >>>> > support for the idea of creating a setup.py file and pypi
>> package
>> >> >>>> > for
>> >> >>>> > pyspark?
>> >> >>>> >
>> >> >>>> > Cheers,
>> >> >>>> >
>> >> >>>> > Brian
>> >> >>>> >
>> >> >>>> > On Thu, Aug 6, 2015 at 3:14 PM, Davies Liu <
>> dav...@databricks.com>
>> >> >>>> > wrote:
>> >> >>>> >> We could do that after 1.5 released, it will have same release
>> >> >>>> >> cycle
>> >> >>>> >> as Spark in the

Re: DataFrame#rdd doesn't respect DataFrame#cache, slowing down CrossValidator

2015-07-31 Thread Justin Uang
Sweet! It's here:
https://issues.apache.org/jira/browse/SPARK-9141?focusedCommentId=14649437&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14649437

On Tue, Jul 28, 2015 at 11:21 PM Michael Armbrust 
wrote:

> Can you add your description of the problem as a comment to that ticket
> and we'll make sure to test both cases and break it out if the root cause
> ends up being different.
>
> On Tue, Jul 28, 2015 at 2:48 PM, Justin Uang 
> wrote:
>
>> Sweet! Does this cover DataFrame#rdd also using the cached query from
>> DataFrame#cache? I think the ticket 9141 is mainly concerned with whether a
>> derived DataFrame (B) of a cached DataFrame (A) uses the cached query of A,
>> not whether the rdd from A.rdd or B.rdd uses the cached query of A.
>>
>> On Tue, Jul 28, 2015 at 11:33 PM Joseph Bradley 
>> wrote:
>>
>>> Thanks for bringing this up!  I talked with Michael Armbrust, and it
>>> sounds like this is a from a bug in DataFrame caching:
>>> https://issues.apache.org/jira/browse/SPARK-9141
>>> It's marked as a blocker for 1.5.
>>> Joseph
>>>
>>> On Tue, Jul 28, 2015 at 2:36 AM, Justin Uang 
>>> wrote:
>>>
>>>> Hey guys,
>>>>
>>>> I'm running into some pretty bad performance issues when it comes to
>>>> using a CrossValidator, because of caching behavior of DataFrames.
>>>>
>>>> The root of the problem is that while I have cached my DataFrame
>>>> representing the features and labels, it is caching at the DataFrame level,
>>>> while CrossValidator/LogisticRegression both drop down to the dataset.rdd
>>>> level, which ignores the caching that I have previously done. This is
>>>> worsened by the fact that for each combination of a fold and a param set
>>>> from the grid, it recomputes my entire input dataset because the caching
>>>> was lost.
>>>>
>>>> My current solution is to force the input DataFrame to be based off of
>>>> a cached RDD, which I did with this horrible hack (had to drop down to java
>>>> from the pyspark because of something to do with vectors not be inferred
>>>> correctly):
>>>>
>>>> def checkpoint_dataframe_caching(df):
>>>> return
>>>> DataFrame(sqlContext._ssql_ctx.createDataFrame(df._jdf.rdd().cache(),
>>>> train_data._jdf.schema()), sqlContext)
>>>>
>>>> before I pass it into the CrossValidator.fit(). If I do this, I still
>>>> have to cache the underlying rdd once more than necessary (in addition to
>>>> DataFrame#cache()), but at least in cross validation, it doesn't recompute
>>>> the RDD graph anymore.
>>>>
>>>> Note, that input_df.rdd.cache() doesn't work because the python
>>>> CrossValidator implementation applies some more dataframe transformations
>>>> like filter, which then causes filtered_df.rdd to return a completely
>>>> different rdd that recomputes the entire graph.
>>>>
>>>> Is it the intention of Spark SQL that calling DataFrame#rdd removes any
>>>> caching that was done for the query? Is the fix as simple as getting the
>>>> DataFrame#rdd to reference the cached query, or is there something more
>>>> subtle going on.
>>>>
>>>> Best,
>>>>
>>>> Justin
>>>>
>>>
>>>
>


Re: DataFrame#rdd doesn't respect DataFrame#cache, slowing down CrossValidator

2015-07-28 Thread Justin Uang
Sweet! Does this cover DataFrame#rdd also using the cached query from
DataFrame#cache? I think the ticket 9141 is mainly concerned with whether a
derived DataFrame (B) of a cached DataFrame (A) uses the cached query of A,
not whether the rdd from A.rdd or B.rdd uses the cached query of A.
On Tue, Jul 28, 2015 at 11:33 PM Joseph Bradley 
wrote:

> Thanks for bringing this up!  I talked with Michael Armbrust, and it
> sounds like this is a from a bug in DataFrame caching:
> https://issues.apache.org/jira/browse/SPARK-9141
> It's marked as a blocker for 1.5.
> Joseph
>
> On Tue, Jul 28, 2015 at 2:36 AM, Justin Uang 
> wrote:
>
>> Hey guys,
>>
>> I'm running into some pretty bad performance issues when it comes to
>> using a CrossValidator, because of caching behavior of DataFrames.
>>
>> The root of the problem is that while I have cached my DataFrame
>> representing the features and labels, it is caching at the DataFrame level,
>> while CrossValidator/LogisticRegression both drop down to the dataset.rdd
>> level, which ignores the caching that I have previously done. This is
>> worsened by the fact that for each combination of a fold and a param set
>> from the grid, it recomputes my entire input dataset because the caching
>> was lost.
>>
>> My current solution is to force the input DataFrame to be based off of a
>> cached RDD, which I did with this horrible hack (had to drop down to java
>> from the pyspark because of something to do with vectors not be inferred
>> correctly):
>>
>> def checkpoint_dataframe_caching(df):
>> return
>> DataFrame(sqlContext._ssql_ctx.createDataFrame(df._jdf.rdd().cache(),
>> train_data._jdf.schema()), sqlContext)
>>
>> before I pass it into the CrossValidator.fit(). If I do this, I still
>> have to cache the underlying rdd once more than necessary (in addition to
>> DataFrame#cache()), but at least in cross validation, it doesn't recompute
>> the RDD graph anymore.
>>
>> Note, that input_df.rdd.cache() doesn't work because the python
>> CrossValidator implementation applies some more dataframe transformations
>> like filter, which then causes filtered_df.rdd to return a completely
>> different rdd that recomputes the entire graph.
>>
>> Is it the intention of Spark SQL that calling DataFrame#rdd removes any
>> caching that was done for the query? Is the fix as simple as getting the
>> DataFrame#rdd to reference the cached query, or is there something more
>> subtle going on.
>>
>> Best,
>>
>> Justin
>>
>
>


Re: PySpark on PyPi

2015-07-28 Thread Justin Uang
// ping

do we have any signoff from the pyspark devs to submit a PR to publish to
PyPI?

On Fri, Jul 24, 2015 at 10:50 PM Jeremy Freeman 
wrote:

> Hey all, great discussion, just wanted to +1 that I see a lot of value in
> steps that make it easier to use PySpark as an ordinary python library.
>
> You might want to check out this (https://github.com/minrk/findspark),
> started by Jupyter project devs, that offers one way to facilitate this
> stuff. I’ve also cced them here to join the conversation.
>
> Also, @Jey, I can also confirm that at least in some scenarios (I’ve done
> it in an EC2 cluster in standalone mode) it’s possible to run PySpark jobs
> just using `from pyspark import SparkContext; sc =
> SparkContext(master=“X”)` so long as the environmental variables
> (PYTHONPATH and PYSPARK_PYTHON) are set correctly on *both* workers and
> driver. That said, there’s definitely additional configuration /
> functionality that would require going through the proper submit scripts.
>
> On Jul 22, 2015, at 7:41 PM, Punyashloka Biswal 
> wrote:
>
> I agree with everything Justin just said. An additional advantage of
> publishing PySpark's Python code in a standards-compliant way is the fact
> that we'll be able to declare transitive dependencies (Pandas, Py4J) in a
> way that pip can use. Contrast this with the current situation, where
> df.toPandas() exists in the Spark API but doesn't actually work until you
> install Pandas.
>
> Punya
> On Wed, Jul 22, 2015 at 12:49 PM Justin Uang 
> wrote:
>
>> // + *Davies* for his comments
>> // + Punya for SA
>>
>> For development and CI, like Olivier mentioned, I think it would be
>> hugely beneficial to publish pyspark (only code in the python/ dir) on
>> PyPI. If anyone wants to develop against PySpark APIs, they need to
>> download the distribution and do a lot of PYTHONPATH munging for all the
>> tools (pylint, pytest, IDE code completion). Right now that involves adding
>> python/ and python/lib/py4j-0.8.2.1-src.zip. In case pyspark ever wants to
>> add more dependencies, we would have to manually mirror all the PYTHONPATH
>> munging in the ./pyspark script. With a proper pyspark setup.py which
>> declares its dependencies, and a published distribution, depending on
>> pyspark will just be adding pyspark to my setup.py dependencies.
>>
>> Of course, if we actually want to run parts of pyspark that is backed by
>> Py4J calls, then we need the full spark distribution with either ./pyspark
>> or ./spark-submit, but for things like linting and development, the
>> PYTHONPATH munging is very annoying.
>>
>> I don't think the version-mismatch issues are a compelling reason to not
>> go ahead with PyPI publishing. At runtime, we should definitely enforce
>> that the version has to be exact, which means there is no backcompat
>> nightmare as suggested by Davies in
>> https://issues.apache.org/jira/browse/SPARK-1267. This would mean that
>> even if the user got his pip installed pyspark to somehow get loaded before
>> the spark distribution provided pyspark, then the user would be alerted
>> immediately.
>>
>> *Davies*, if you buy this, should me or someone on my team pick up
>> https://issues.apache.org/jira/browse/SPARK-1267 and
>> https://github.com/apache/spark/pull/464?
>>
>> On Sat, Jun 6, 2015 at 12:48 AM Olivier Girardot <
>> o.girar...@lateral-thoughts.com> wrote:
>>
>>> Ok, I get it. Now what can we do to improve the current situation,
>>> because right now if I want to set-up a CI env for PySpark, I have to :
>>> 1- download a pre-built version of pyspark and unzip it somewhere on
>>> every agent
>>> 2- define the SPARK_HOME env
>>> 3- symlink this distribution pyspark dir inside the python install dir
>>> site-packages/ directory
>>> and if I rely on additional packages (like databricks' Spark-CSV
>>> project), I have to (except if I'm mistaken)
>>> 4- compile/assembly spark-csv, deploy the jar in a specific directory on
>>> every agent
>>> 5- add this jar-filled directory to the Spark distribution's additional
>>> classpath using the conf/spark-default file
>>>
>>> Then finally we can launch our unit/integration-tests.
>>> Some issues are related to spark-packages, some to the lack of
>>> python-based dependency, and some to the way SparkContext are launched when
>>> using pyspark.
>>> I think step 1 and 2 are fair enough
>>> 4 and 5 may already have solutions, I didn't check and considering
>>> spark-shell is downloading such depende

DataFrame#rdd doesn't respect DataFrame#cache, slowing down CrossValidator

2015-07-28 Thread Justin Uang
Hey guys,

I'm running into some pretty bad performance issues when it comes to using
a CrossValidator, because of caching behavior of DataFrames.

The root of the problem is that while I have cached my DataFrame
representing the features and labels, it is caching at the DataFrame level,
while CrossValidator/LogisticRegression both drop down to the dataset.rdd
level, which ignores the caching that I have previously done. This is
worsened by the fact that for each combination of a fold and a param set
from the grid, it recomputes my entire input dataset because the caching
was lost.

My current solution is to force the input DataFrame to be based off of a
cached RDD, which I did with this horrible hack (had to drop down to java
from the pyspark because of something to do with vectors not be inferred
correctly):

def checkpoint_dataframe_caching(df):
return
DataFrame(sqlContext._ssql_ctx.createDataFrame(df._jdf.rdd().cache(),
train_data._jdf.schema()), sqlContext)

before I pass it into the CrossValidator.fit(). If I do this, I still have
to cache the underlying rdd once more than necessary (in addition to
DataFrame#cache()), but at least in cross validation, it doesn't recompute
the RDD graph anymore.

Note, that input_df.rdd.cache() doesn't work because the python
CrossValidator implementation applies some more dataframe transformations
like filter, which then causes filtered_df.rdd to return a completely
different rdd that recomputes the entire graph.

Is it the intention of Spark SQL that calling DataFrame#rdd removes any
caching that was done for the query? Is the fix as simple as getting the
DataFrame#rdd to reference the cached query, or is there something more
subtle going on.

Best,

Justin


Re: PySpark on PyPi

2015-07-22 Thread Justin Uang
// + *Davies* for his comments
// + Punya for SA

For development and CI, like Olivier mentioned, I think it would be hugely
beneficial to publish pyspark (only code in the python/ dir) on PyPI. If
anyone wants to develop against PySpark APIs, they need to download the
distribution and do a lot of PYTHONPATH munging for all the tools (pylint,
pytest, IDE code completion). Right now that involves adding python/ and
python/lib/py4j-0.8.2.1-src.zip. In case pyspark ever wants to add more
dependencies, we would have to manually mirror all the PYTHONPATH munging
in the ./pyspark script. With a proper pyspark setup.py which declares its
dependencies, and a published distribution, depending on pyspark will just
be adding pyspark to my setup.py dependencies.

Of course, if we actually want to run parts of pyspark that is backed by
Py4J calls, then we need the full spark distribution with either ./pyspark
or ./spark-submit, but for things like linting and development, the
PYTHONPATH munging is very annoying.

I don't think the version-mismatch issues are a compelling reason to not go
ahead with PyPI publishing. At runtime, we should definitely enforce that
the version has to be exact, which means there is no backcompat nightmare
as suggested by Davies in https://issues.apache.org/jira/browse/SPARK-1267.
This would mean that even if the user got his pip installed pyspark to
somehow get loaded before the spark distribution provided pyspark, then the
user would be alerted immediately.

*Davies*, if you buy this, should me or someone on my team pick up
https://issues.apache.org/jira/browse/SPARK-1267 and
https://github.com/apache/spark/pull/464?

On Sat, Jun 6, 2015 at 12:48 AM Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> Ok, I get it. Now what can we do to improve the current situation, because
> right now if I want to set-up a CI env for PySpark, I have to :
> 1- download a pre-built version of pyspark and unzip it somewhere on every
> agent
> 2- define the SPARK_HOME env
> 3- symlink this distribution pyspark dir inside the python install dir
> site-packages/ directory
> and if I rely on additional packages (like databricks' Spark-CSV project),
> I have to (except if I'm mistaken)
> 4- compile/assembly spark-csv, deploy the jar in a specific directory on
> every agent
> 5- add this jar-filled directory to the Spark distribution's additional
> classpath using the conf/spark-default file
>
> Then finally we can launch our unit/integration-tests.
> Some issues are related to spark-packages, some to the lack of
> python-based dependency, and some to the way SparkContext are launched when
> using pyspark.
> I think step 1 and 2 are fair enough
> 4 and 5 may already have solutions, I didn't check and considering
> spark-shell is downloading such dependencies automatically, I think if
> nothing's done yet it will (I guess ?).
>
> For step 3, maybe just adding a setup.py to the distribution would be
> enough, I'm not exactly advocating to distribute a full 300Mb spark
> distribution in PyPi, maybe there's a better compromise ?
>
> Regards,
>
> Olivier.
>
> Le ven. 5 juin 2015 à 22:12, Jey Kottalam  a écrit :
>
>> Couldn't we have a pip installable "pyspark" package that just serves as
>> a shim to an existing Spark installation? Or it could even download the
>> latest Spark binary if SPARK_HOME isn't set during installation. Right now,
>> Spark doesn't play very well with the usual Python ecosystem. For example,
>> why do I need to use a strange incantation when booting up IPython if I
>> want to use PySpark in a notebook with MASTER="local[4]"? It would be much
>> nicer to just type `from pyspark import SparkContext; sc =
>> SparkContext("local[4]")` in my notebook.
>>
>> I did a test and it seems like PySpark's basic unit-tests do pass when
>> SPARK_HOME is set and Py4J is on the PYTHONPATH:
>>
>>
>> PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH
>> python $SPARK_HOME/python/pyspark/rdd.py
>>
>> -Jey
>>
>>
>> On Fri, Jun 5, 2015 at 10:57 AM, Josh Rosen  wrote:
>>
>>> This has been proposed before:
>>> https://issues.apache.org/jira/browse/SPARK-1267
>>>
>>> There's currently tighter coupling between the Python and Java halves of
>>> PySpark than just requiring SPARK_HOME to be set; if we did this, I bet
>>> we'd run into tons of issues when users try to run a newer version of the
>>> Python half of PySpark against an older set of Java components or
>>> vice-versa.
>>>
>>> On Thu, Jun 4, 2015 at 10:45 PM, Olivier Girardot <
>>> o.girar...@lateral-thoughts.com> wrote:
>>>
 Hi everyone,
 Considering the python API as just a front needing the SPARK_HOME
 defined anyway, I think it would be interesting to deploy the Python part
 of Spark on PyPi in order to handle the dependencies in a Python project
 needing PySpark via pip.

 For now I just symlink the python/pyspark in my python install dir
 site-packages/ in order for PyCharm or other 

Re: how can I write a language "wrapper"?

2015-06-29 Thread Justin Uang
My guess is that if you are just wrapping the spark sql APIs, you can get
away with not having to reimplement a lot of the complexities in Pyspark
like storing everything in RDDs as pickled byte arrays, pipelining RDDs,
doing aggregations and joins in the python interpreters, etc.

Since the canonical representation of objects in Spark SQL is in scala/jvm,
you're effectively just proxying calls to the java side. The only tricky
thing is UDFs, which naturally need to run in an interpreter of the wrapper
language. I'm currently thinking of redesigning the UDFs to be sent in a
language agnostic data format like protobufs or msgpack, so that all
language wrappers just need to implement the simple protocol of reading
those in, transforming it, then outputting it back as that language
agnostic format.

On Mon, Jun 29, 2015 at 6:39 AM Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> Hi Vasili,
> It so happens that the entire SparkR code was merged to Apache Spark in a
> single pull request. So you can see at once all the required changes in
> https://github.com/apache/spark/pull/5096. It's 12,043 lines and took
> more than 20 people about a year to write as I understand it.
>
> On Mon, Jun 29, 2015 at 10:33 AM, Vasili I. Galchin 
> wrote:
>
>> Shivaram,
>>
>> Vis-a-vis Haskell support, I am reading DataFrame.R,
>> SparkRBackend*, context.R, et. al., am I headed in the correct
>> direction?/ Yes or no, please give more guidance. Thank you.
>>
>> Kind regards,
>>
>> Vasili
>>
>>
>>
>> On Tue, Jun 23, 2015 at 1:46 PM, Shivaram Venkataraman
>>  wrote:
>> > Every language has its own quirks / features -- so I don't think there
>> > exists a document on how to go about doing this for a new language. The
>> most
>> > related write up I know of is the wiki page on PySpark internals
>> > https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
>> written
>> > by Josh Rosen -- It covers some of the issues like closure capture,
>> > serialization, JVM communication that you'll need to handle for a new
>> > language.
>> >
>> > Thanks
>> > Shivaram
>> >
>> > On Tue, Jun 23, 2015 at 1:35 PM, Vasili I. Galchin > >
>> > wrote:
>> >>
>> >> Hello,
>> >>
>> >>   I want to add language support for another language(other than
>> >> Scala, Java et. al.). Where is documentation that explains to provide
>> >> support for a new language?
>> >>
>> >> Thank you,
>> >>
>> >> Vasili
>> >
>> >
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


Re: Python UDF performance at large scale

2015-06-25 Thread Justin Uang
Sweet, filed here: https://issues.apache.org/jira/browse/SPARK-8632

On Thu, Jun 25, 2015 at 3:05 AM Davies Liu  wrote:

> I'm thinking that the batched synchronous version will be too slow
> (with small batch size) or easy to OOM with large (batch size). If
> it's not that hard, you can give it a try.
>
> On Wed, Jun 24, 2015 at 4:39 PM, Justin Uang 
> wrote:
> > Correct, I was running with a batch size of about 100 when I did the
> tests,
> > because I was worried about deadlocks. Do you have any concerns regarding
> > the batched synchronous version of communication between the Java and
> Python
> > processes, and if not, should I file a ticket and starting writing it?
> >
> > On Wed, Jun 24, 2015 at 7:27 PM Davies Liu 
> wrote:
> >>
> >> From you comment, the 2x improvement only happens when you have the
> >> batch size as 1, right?
> >>
> >> On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang 
> >> wrote:
> >> > FYI, just submitted a PR to Pyrolite to remove their StopException.
> >> > https://github.com/irmen/Pyrolite/pull/30
> >> >
> >> > With my benchmark, removing it basically made it about 2x faster.
> >> >
> >> > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal
> >> > 
> >> > wrote:
> >> >>
> >> >> Hi Davies,
> >> >>
> >> >> In general, do we expect people to use CPython only for "heavyweight"
> >> >> UDFs
> >> >> that invoke an external library? Are there any examples of using
> >> >> Jython,
> >> >> especially performance comparisons to Java/Scala and CPython? When
> >> >> using
> >> >> Jython, do you expect the driver to send code to the executor as a
> >> >> string,
> >> >> or is there a good way to serialized Jython lambdas?
> >> >>
> >> >> (For context, I was unable to serialize Nashorn lambdas when I tried
> to
> >> >> use them in Spark.)
> >> >>
> >> >> Punya
> >> >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu 
> >> >> wrote:
> >> >>>
> >> >>> Fare points, I also like simpler solutions.
> >> >>>
> >> >>> The overhead of Python task could be a few of milliseconds, which
> >> >>> means we also should eval them as batches (one Python task per
> batch).
> >> >>>
> >> >>> Decreasing the batch size for UDF sounds reasonable to me, together
> >> >>> with other tricks to reduce the data in socket/pipe buffer.
> >> >>>
> >> >>> BTW, what do your UDF looks like? How about to use Jython to run
> >> >>> simple Python UDF (without some external libraries).
> >> >>>
> >> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang  >
> >> >>> wrote:
> >> >>> > // + punya
> >> >>> >
> >> >>> > Thanks for your quick response!
> >> >>> >
> >> >>> > I'm not sure that using an unbounded buffer is a good solution to
> >> >>> > the
> >> >>> > locking problem. For example, in the situation where I had 500
> >> >>> > columns,
> >> >>> > I am
> >> >>> > in fact storing 499 extra columns on the java side, which might
> make
> >> >>> > me
> >> >>> > OOM
> >> >>> > if I have to store many rows. In addition, if I am using an
> >> >>> > AutoBatchedSerializer, the java side might have to write 1 << 16
> ==
> >> >>> > 65536
> >> >>> > rows before python starts outputting elements, in which case, the
> >> >>> > Java
> >> >>> > side
> >> >>> > has to buffer 65536 complete rows. In general it seems fragile to
> >> >>> > rely
> >> >>> > on
> >> >>> > blocking behavior in the Python coprocess. By contrast, it's very
> >> >>> > easy
> >> >>> > to
> >> >>> > verify the correctness and performance characteristics of the
> >> >>> > synchronous
> >> >>> > blocking solution.
> >> >>> >
> >> >>> >
> >> >>> > On Tue, Jun 23, 2015 at 7:21 PM D

Re: Python UDF performance at large scale

2015-06-24 Thread Justin Uang
Correct, I was running with a batch size of about 100 when I did the tests,
because I was worried about deadlocks. Do you have any concerns regarding
the batched synchronous version of communication between the Java and
Python processes, and if not, should I file a ticket and starting writing
it?
On Wed, Jun 24, 2015 at 7:27 PM Davies Liu  wrote:

> From you comment, the 2x improvement only happens when you have the
> batch size as 1, right?
>
> On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang 
> wrote:
> > FYI, just submitted a PR to Pyrolite to remove their StopException.
> > https://github.com/irmen/Pyrolite/pull/30
> >
> > With my benchmark, removing it basically made it about 2x faster.
> >
> > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal <
> punya.bis...@gmail.com>
> > wrote:
> >>
> >> Hi Davies,
> >>
> >> In general, do we expect people to use CPython only for "heavyweight"
> UDFs
> >> that invoke an external library? Are there any examples of using Jython,
> >> especially performance comparisons to Java/Scala and CPython? When using
> >> Jython, do you expect the driver to send code to the executor as a
> string,
> >> or is there a good way to serialized Jython lambdas?
> >>
> >> (For context, I was unable to serialize Nashorn lambdas when I tried to
> >> use them in Spark.)
> >>
> >> Punya
> >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu 
> wrote:
> >>>
> >>> Fare points, I also like simpler solutions.
> >>>
> >>> The overhead of Python task could be a few of milliseconds, which
> >>> means we also should eval them as batches (one Python task per batch).
> >>>
> >>> Decreasing the batch size for UDF sounds reasonable to me, together
> >>> with other tricks to reduce the data in socket/pipe buffer.
> >>>
> >>> BTW, what do your UDF looks like? How about to use Jython to run
> >>> simple Python UDF (without some external libraries).
> >>>
> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang 
> >>> wrote:
> >>> > // + punya
> >>> >
> >>> > Thanks for your quick response!
> >>> >
> >>> > I'm not sure that using an unbounded buffer is a good solution to the
> >>> > locking problem. For example, in the situation where I had 500
> columns,
> >>> > I am
> >>> > in fact storing 499 extra columns on the java side, which might make
> me
> >>> > OOM
> >>> > if I have to store many rows. In addition, if I am using an
> >>> > AutoBatchedSerializer, the java side might have to write 1 << 16 ==
> >>> > 65536
> >>> > rows before python starts outputting elements, in which case, the
> Java
> >>> > side
> >>> > has to buffer 65536 complete rows. In general it seems fragile to
> rely
> >>> > on
> >>> > blocking behavior in the Python coprocess. By contrast, it's very
> easy
> >>> > to
> >>> > verify the correctness and performance characteristics of the
> >>> > synchronous
> >>> > blocking solution.
> >>> >
> >>> >
> >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu 
> >>> > wrote:
> >>> >>
> >>> >> Thanks for looking into it, I'd like the idea of having
> >>> >> ForkingIterator. If we have unlimited buffer in it, then will not
> have
> >>> >> the problem of deadlock, I think. The writing thread will be blocked
> >>> >> by Python process, so there will be not much rows be buffered(still
> be
> >>> >> a reason to OOM). At least, this approach is better than current
> one.
> >>> >>
> >>> >> Could you create a JIRA and sending out the PR?
> >>> >>
> >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang  >
> >>> >> wrote:
> >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
> >>> >> > scale,
> >>> >> > but
> >>> >> > I have a proof-of-concept implementation that avoids caching the
> >>> >> > entire
> >>> >> > dataset.
> >>> >> >
> >>> >> > Hi,
> >>> >> >
> >>> >> > We have been running into performance

Re: Python UDF performance at large scale

2015-06-24 Thread Justin Uang
FYI, just submitted a PR to Pyrolite to remove their StopException.
https://github.com/irmen/Pyrolite/pull/30

With my benchmark, removing it basically made it about 2x faster.

On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal 
wrote:

> Hi Davies,
>
> In general, do we expect people to use CPython only for "heavyweight" UDFs
> that invoke an external library? Are there any examples of using Jython,
> especially performance comparisons to Java/Scala and CPython? When using
> Jython, do you expect the driver to send code to the executor as a string,
> or is there a good way to serialized Jython lambdas?
>
> (For context, I was unable to serialize Nashorn lambdas when I tried to
> use them in Spark.)
>
> Punya
> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu  wrote:
>
>> Fare points, I also like simpler solutions.
>>
>> The overhead of Python task could be a few of milliseconds, which
>> means we also should eval them as batches (one Python task per batch).
>>
>> Decreasing the batch size for UDF sounds reasonable to me, together
>> with other tricks to reduce the data in socket/pipe buffer.
>>
>> BTW, what do your UDF looks like? How about to use Jython to run
>> simple Python UDF (without some external libraries).
>>
>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang 
>> wrote:
>> > // + punya
>> >
>> > Thanks for your quick response!
>> >
>> > I'm not sure that using an unbounded buffer is a good solution to the
>> > locking problem. For example, in the situation where I had 500 columns,
>> I am
>> > in fact storing 499 extra columns on the java side, which might make me
>> OOM
>> > if I have to store many rows. In addition, if I am using an
>> > AutoBatchedSerializer, the java side might have to write 1 << 16 ==
>> 65536
>> > rows before python starts outputting elements, in which case, the Java
>> side
>> > has to buffer 65536 complete rows. In general it seems fragile to rely
>> on
>> > blocking behavior in the Python coprocess. By contrast, it's very easy
>> to
>> > verify the correctness and performance characteristics of the
>> synchronous
>> > blocking solution.
>> >
>> >
>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu 
>> wrote:
>> >>
>> >> Thanks for looking into it, I'd like the idea of having
>> >> ForkingIterator. If we have unlimited buffer in it, then will not have
>> >> the problem of deadlock, I think. The writing thread will be blocked
>> >> by Python process, so there will be not much rows be buffered(still be
>> >> a reason to OOM). At least, this approach is better than current one.
>> >>
>> >> Could you create a JIRA and sending out the PR?
>> >>
>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang 
>> >> wrote:
>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
>> scale,
>> >> > but
>> >> > I have a proof-of-concept implementation that avoids caching the
>> entire
>> >> > dataset.
>> >> >
>> >> > Hi,
>> >> >
>> >> > We have been running into performance problems using Python UDFs with
>> >> > DataFrames at large scale.
>> >> >
>> >> > From the implementation of BatchPythonEvaluation, it looks like the
>> goal
>> >> > was
>> >> > to reuse the PythonRDD code. It caches the entire child RDD so that
>> it
>> >> > can
>> >> > do two passes over the data. One to give to the PythonRDD, then one
>> to
>> >> > join
>> >> > the python lambda results with the original row (which may have java
>> >> > objects
>> >> > that should be passed through).
>> >> >
>> >> > In addition, it caches all the columns, even the ones that don't
>> need to
>> >> > be
>> >> > processed by the Python UDF. In the cases I was working with, I had a
>> >> > 500
>> >> > column table, and i wanted to use a python UDF for one column, and it
>> >> > ended
>> >> > up caching all 500 columns.
>> >> >
>> >> > I have a working solution over here that does it in one pass over the
>> >> > data,
>> >> > avoiding caching
>> >> >
>> >> > (
>> https://github.com/justinuang/spark/commit/c1a415a18d3122

Re: Python UDF performance at large scale

2015-06-23 Thread Justin Uang
// + punya

Thanks for your quick response!

I'm not sure that using an unbounded buffer is a good solution to the
locking problem. For example, in the situation where I had 500 columns, I
am in fact storing 499 extra columns on the java side, which might make me
OOM if I have to store many rows. In addition, if I am using an
AutoBatchedSerializer, the java side might have to write 1 << 16 == 65536
rows before python starts outputting elements, in which case, the Java side
has to buffer 65536 complete rows. In general it seems fragile to rely on
blocking behavior in the Python coprocess. By contrast, it's very easy to
verify the correctness and performance characteristics of the synchronous
blocking solution.


On Tue, Jun 23, 2015 at 7:21 PM Davies Liu  wrote:

> Thanks for looking into it, I'd like the idea of having
> ForkingIterator. If we have unlimited buffer in it, then will not have
> the problem of deadlock, I think. The writing thread will be blocked
> by Python process, so there will be not much rows be buffered(still be
> a reason to OOM). At least, this approach is better than current one.
>
> Could you create a JIRA and sending out the PR?
>
> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang 
> wrote:
> > BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
> but
> > I have a proof-of-concept implementation that avoids caching the entire
> > dataset.
> >
> > Hi,
> >
> > We have been running into performance problems using Python UDFs with
> > DataFrames at large scale.
> >
> > From the implementation of BatchPythonEvaluation, it looks like the goal
> was
> > to reuse the PythonRDD code. It caches the entire child RDD so that it
> can
> > do two passes over the data. One to give to the PythonRDD, then one to
> join
> > the python lambda results with the original row (which may have java
> objects
> > that should be passed through).
> >
> > In addition, it caches all the columns, even the ones that don't need to
> be
> > processed by the Python UDF. In the cases I was working with, I had a 500
> > column table, and i wanted to use a python UDF for one column, and it
> ended
> > up caching all 500 columns.
> >
> > I have a working solution over here that does it in one pass over the
> data,
> > avoiding caching
> > (
> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
> ).
> > With this patch, I go from a job that takes 20 minutes then OOMs, to a
> job
> > that finishes completely in 3 minutes. It is indeed quite hacky and
> prone to
> > deadlocks since there is buffering in many locations:
> >
> > - NEW: the ForkingIterator LinkedBlockingDeque
> > - batching the rows before pickling them
> > - os buffers on both sides
> > - pyspark.serializers.BatchedSerializer
> >
> > We can avoid deadlock by being very disciplined. For example, we can have
> > the ForkingIterator instead always do a check of whether the
> > LinkedBlockingDeque is full and if so:
> >
> > Java
> > - flush the java pickling buffer
> > - send a flush command to the python process
> > - os.flush the java side
> >
> > Python
> > - flush BatchedSerializer
> > - os.flush()
> >
> > I haven't added this yet. This is getting very complex however. Another
> > model would just be to change the protocol between the java side and the
> > worker to be a synchronous request/response. This has the disadvantage
> that
> > the CPU isn't doing anything when the batch is being sent across, but it
> has
> > the huge advantage of simplicity. In addition, I imagine that the actual
> IO
> > between the processes isn't that slow, but rather the serialization of
> java
> > objects into pickled bytes, and the deserialization/serialization +
> python
> > loops on the python side. Another advantage is that we won't be taking
> more
> > than 100% CPU since only one thread is doing CPU work at a time between
> the
> > executor and the python interpreter.
> >
> > Any thoughts would be much appreciated =)
> >
> > Other improvements:
> > - extract some code of the worker out of PythonRDD so that we can do
> a
> > mapPartitions directly in BatchedPythonEvaluation without resorting to
> the
> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the
> other
> > RDD can get a handle to the same iterator.
> > - read elements and use a size estimator to create the BlockingQueue
> to
> > make sure that we don't store too many things in memory when batching
> > - patch Unpickler to not use StopException for control flow, which is
> > slowing down the java side
> >
> >
>


Python UDF performance at large scale

2015-06-23 Thread Justin Uang
BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
but I have a proof-of-concept implementation that avoids caching the entire
dataset.

Hi,

We have been running into performance problems using Python UDFs with
DataFrames at large scale.

>From the implementation of BatchPythonEvaluation, it looks like the goal
was to reuse the PythonRDD code. It caches the entire child RDD so that it
can do two passes over the data. One to give to the PythonRDD, then one to
join the python lambda results with the original row (which may have java
objects that should be passed through).

In addition, it caches all the columns, even the ones that don't need to be
processed by the Python UDF. In the cases I was working with, I had a 500
column table, and i wanted to use a python UDF for one column, and it ended
up caching all 500 columns.

I have a working solution over here that does it in one pass over the data,
avoiding caching (
https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
With this patch, I go from a job that takes 20 minutes then OOMs, to a job
that finishes completely in 3 minutes. It is indeed quite hacky and prone
to deadlocks since there is buffering in many locations:

- NEW: the ForkingIterator LinkedBlockingDeque
- batching the rows before pickling them
- os buffers on both sides
- pyspark.serializers.BatchedSerializer

We can avoid deadlock by being very disciplined. For example, we can have
the ForkingIterator instead always do a check of whether the
LinkedBlockingDeque is full and if so:

Java
- flush the java pickling buffer
- send a flush command to the python process
- os.flush the java side

Python
- flush BatchedSerializer
- os.flush()

I haven't added this yet. This is getting very complex however. Another
model would just be to change the protocol between the java side and the
worker to be a synchronous request/response. This has the disadvantage that
the CPU isn't doing anything when the batch is being sent across, but it
has the huge advantage of simplicity. In addition, I imagine that the
actual IO between the processes isn't that slow, but rather the
serialization of java objects into pickled bytes, and the
deserialization/serialization + python loops on the python side. Another
advantage is that we won't be taking more than 100% CPU since only one
thread is doing CPU work at a time between the executor and the python
interpreter.

Any thoughts would be much appreciated =)

Other improvements:
- extract some code of the worker out of PythonRDD so that we can do a
mapPartitions directly in BatchedPythonEvaluation without resorting to the
hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
RDD can get a handle to the same iterator.
- read elements and use a size estimator to create the BlockingQueue to
make sure that we don't store too many things in memory when batching
- patch Unpickler to not use StopException for control flow, which is
slowing down the java side


Re: Catalyst: Reusing already computed expressions within a projection

2015-05-31 Thread Justin Uang
Thanks for pointing to that link! It looks like it’s useful, but it does
look more complicated than the case I’m trying to address.

In my case, we set y = f(x), then we use y later on in future projections (z
= g(y)). In that case, the analysis is trivial in that we aren’t trying to
find equivalent expressions, we actually know that z is based off of y. In
addition, we are already storing off y because it’s one of the projections,
so there’s no tradeoff between time vs memory.
Perf gains

I believe that the performance gains can be quite substantial, but can you
check that the case I bring up below will indeed benefit from such a
optimization?

For example, suppose I have a date column (unclean_date) that is stored in
some strange string format. I then use an udf or a hive function that
converts it to the Catalyst date type (cleaned_date). Next, I want to
extract one column with the month, and another with the year, so I can do
groupBys/aggregations later. Currently, every projection/expression based
off of the cleaned_date will have to do the expensive parsing again if I
avoid caching and prefer to do everything in one pass.
Code generation phase vs optimization

Is there a reason why doing it at the optimization phase is the wrong
approach? If sounds like we’re actually logically changing the order of
computation if we do my proposed approach. I do agree however if there are
lower hanging fruits, then we should tackle those first =)
​

On Sat, May 30, 2015 at 10:00 PM Michael Armbrust 
wrote:

> I think this is likely something that we'll want to do during the code
> generation phase.  Though its probably not the lowest hanging fruit at this
> point.
>
> On Sun, May 31, 2015 at 5:02 AM, Reynold Xin  wrote:
>
>> I think you are looking for
>> http://en.wikipedia.org/wiki/Common_subexpression_elimination in the
>> optimizer.
>>
>> One thing to note is that as we do more and more optimization like this,
>> the optimization time might increase. Do you see a case where this can
>> bring you substantial performance gains?
>>
>>
>> On Sat, May 30, 2015 at 9:02 AM, Justin Uang 
>> wrote:
>>
>>> On second thought, perhaps can this be done by writing a rule that
>>> builds the dag of dependencies between expressions, then convert it into
>>> several layers of projections, where each new layer is allowed to depend on
>>> expression results from previous projections?
>>>
>>> Are there any pitfalls to this approach?
>>>
>>> On Sat, May 30, 2015 at 11:30 AM Justin Uang 
>>> wrote:
>>>
>>>> If I do the following
>>>>
>>>> df2 = df.withColumn('y', df['x'] * 7)
>>>> df3 = df2.withColumn('z', df2.y * 3)
>>>> df3.explain()
>>>>
>>>> Then the result is
>>>>
>>>> > Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS
>>>> y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65]
>>>> >  PhysicalRDD [date#56,id#57,timestamp#58,x#59],
>>>> MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163
>>>>
>>>> Effectively I want to compute
>>>>
>>>> y = f(x)
>>>> z = g(y)
>>>>
>>>> The catalyst optimizer realizes that y#64 is the same as the one
>>>> previously computed, however, when building the projection, it is ignoring
>>>> the fact that it had already computed y, so it calculates `x * 7` twice.
>>>>
>>>> y = x * 7
>>>> z = x * 7 * 3
>>>>
>>>> If I wanted to make this fix, would it be possible to do the logic in
>>>> the optimizer phase? I imagine that it's difficult because the expressions
>>>> in InterpretedMutableProjection don't have access to the previous
>>>> expression results, only the input row, and that the design doesn't seem to
>>>> be catered for this.
>>>>
>>>>
>>
>


Re: Catalyst: Reusing already computed expressions within a projection

2015-05-30 Thread Justin Uang
On second thought, perhaps can this be done by writing a rule that builds
the dag of dependencies between expressions, then convert it into several
layers of projections, where each new layer is allowed to depend on
expression results from previous projections?

Are there any pitfalls to this approach?
On Sat, May 30, 2015 at 11:30 AM Justin Uang  wrote:

> If I do the following
>
> df2 = df.withColumn('y', df['x'] * 7)
> df3 = df2.withColumn('z', df2.y * 3)
> df3.explain()
>
> Then the result is
>
> > Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS y#64,((x#59
> * 7.0) AS y#64 * 3.0) AS z#65]
> >  PhysicalRDD [date#56,id#57,timestamp#58,x#59],
> MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163
>
> Effectively I want to compute
>
> y = f(x)
> z = g(y)
>
> The catalyst optimizer realizes that y#64 is the same as the one
> previously computed, however, when building the projection, it is ignoring
> the fact that it had already computed y, so it calculates `x * 7` twice.
>
> y = x * 7
> z = x * 7 * 3
>
> If I wanted to make this fix, would it be possible to do the logic in the
> optimizer phase? I imagine that it's difficult because the expressions in
> InterpretedMutableProjection don't have access to the previous expression
> results, only the input row, and that the design doesn't seem to be catered
> for this.
>
>


Catalyst: Reusing already computed expressions within a projection

2015-05-30 Thread Justin Uang
If I do the following

df2 = df.withColumn('y', df['x'] * 7)
df3 = df2.withColumn('z', df2.y * 3)
df3.explain()

Then the result is

> Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS y#64,((x#59
* 7.0) AS y#64 * 3.0) AS z#65]
>  PhysicalRDD [date#56,id#57,timestamp#58,x#59], MapPartitionsRDD[125]
at mapPartitions at SQLContext.scala:1163

Effectively I want to compute

y = f(x)
z = g(y)

The catalyst optimizer realizes that y#64 is the same as the one previously
computed, however, when building the projection, it is ignoring the fact
that it had already computed y, so it calculates `x * 7` twice.

y = x * 7
z = x * 7 * 3

If I wanted to make this fix, would it be possible to do the logic in the
optimizer phase? I imagine that it's difficult because the expressions in
InterpretedMutableProjection don't have access to the previous expression
results, only the input row, and that the design doesn't seem to be catered
for this.


Re: Using UDFs in Java without registration

2015-05-30 Thread Justin Uang
The idea of asking for both the argument and return class is interesting. I
don't think we do that for the scala APIs currently, right? In
functions.scala, we only use the TypeTag for RT.

  def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]):
UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
  }

There would only be a small subset of conversions that would make sense
implicitly (e.g. int to double, the typical conversions in programming
languages), but something like (double => int) might be dangerous and
(timestamp => double) wouldn't really make sense. Perhaps it's better to be
explicit about casts?

If we don't care about declaring the types of the arguments, perhaps we can
have all of the java UDF interfaces (UDF1, UDF2, etc) extend a generic
interface called UDF, then have

def define(f: UDF, returnType: Class[_])

to simplify the APIs.


On Sat, May 30, 2015 at 3:43 AM Reynold Xin  wrote:

> I think you are right that there is no way to call Java UDF without
> registration right now. Adding another 20 methods to functions would be
> scary. Maybe the best way is to have a companion object
> for UserDefinedFunction, and define UDF there?
>
> e.g.
>
> object UserDefinedFunction {
>
>   def define(f: org.apache.spark.api.java.function.Function0, returnType:
> Class[_]): UserDefinedFunction
>
>   // ... define a few more - maybe up to 5 arguments?
> }
>
> Ideally, we should ask for both argument class and return class, so we can
> do the proper type conversion (e.g. if the UDF expects a string, but the
> input expression is an int, Catalyst can automatically add a cast).
> However, we haven't implemented those in UserDefinedFunction yet.
>
>
>
>
> On Fri, May 29, 2015 at 12:54 PM, Justin Uang 
> wrote:
>
>> I would like to define a UDF in Java via a closure and then use it
>> without registration. In Scala, I believe there are two ways to do this:
>>
>> myUdf = functions.udf({ _ + 5})
>> myDf.select(myUdf(myDf("age")))
>>
>> or
>>
>> myDf.select(functions.callUDF({_ + 5}, DataTypes.IntegerType,
>> myDf("age")))
>>
>> However, both of these don't work for Java UDF. The first one requires
>> TypeTags. For the second one, I was able to hack it by creating a scala
>> AbstractFunction1 and using callUDF, which requires declaring the catalyst
>> DataType instead of using TypeTags. However, it was still nasty because I
>> had to return a scala map instead of a java map.
>>
>> Is there first class support for creating
>> a org.apache.spark.sql.UserDefinedFunction that works with
>> the org.apache.spark.sql.api.java.UDF1? I'm fine with having to
>> declare the catalyst type when creating it.
>>
>> If it doesn't exist, I would be happy to work on it =)
>>
>> Justin
>>
>
>


Using UDFs in Java without registration

2015-05-29 Thread Justin Uang
I would like to define a UDF in Java via a closure and then use it without
registration. In Scala, I believe there are two ways to do this:

myUdf = functions.udf({ _ + 5})
myDf.select(myUdf(myDf("age")))

or

myDf.select(functions.callUDF({_ + 5}, DataTypes.IntegerType,
myDf("age")))

However, both of these don't work for Java UDF. The first one requires
TypeTags. For the second one, I was able to hack it by creating a scala
AbstractFunction1 and using callUDF, which requires declaring the catalyst
DataType instead of using TypeTags. However, it was still nasty because I
had to return a scala map instead of a java map.

Is there first class support for creating
a org.apache.spark.sql.UserDefinedFunction that works with
the org.apache.spark.sql.api.java.UDF1? I'm fine with having to
declare the catalyst type when creating it.

If it doesn't exist, I would be happy to work on it =)

Justin


Re: Spark 1.4.0 pyspark and pylint breaking

2015-05-26 Thread Justin Uang
Thanks for clarifying! I don't understand python package and modules names
that well, but I thought that the package namespacing would've helped,
since you are in pyspark.sql.types. I guess not?

On Tue, May 26, 2015 at 3:03 PM Davies Liu  wrote:

> There is a module called 'types' in python 3:
>
> davies@localhost:~/work/spark$ python3
> Python 3.4.1 (v3.4.1:c0e311e010fc, May 18 2014, 00:54:21)
> [GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> >>> import types
> >>> types
>  '/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/types.py'>
>
> Without renaming, our `types.py` will conflict with it when you run
> unittests in pyspark/sql/ .
>
> On Tue, May 26, 2015 at 11:57 AM, Justin Uang 
> wrote:
> > In commit 04e44b37, the migration to Python 3, pyspark/sql/types.py was
> > renamed to pyspark/sql/_types.py and then some magic in
> > pyspark/sql/__init__.py dynamically renamed the module back to types. I
> > imagine that this is some naming conflict with Python 3, but what was the
> > error that showed up?
> >
> > The reason why I'm asking about this is because it's messing with pylint,
> > since pylint cannot now statically find the module. I tried also
> importing
> > the package so that __init__ would be run in a init-hook, but that isn't
> > what the discovery mechanism is using. I imagine it's probably just
> crawling
> > the directory structure.
> >
> > One way to work around this would be something akin to this
> > (
> http://stackoverflow.com/questions/9602811/how-to-tell-pylint-to-ignore-certain-imports
> ),
> > where I would have to create a fake module, but I would probably be
> missing
> > a ton of pylint features on users of that module, and it's pretty hacky.
>


Spark 1.4.0 pyspark and pylint breaking

2015-05-26 Thread Justin Uang
In commit 04e44b37, the migration to Python 3, pyspark/sql/types.py was
renamed to pyspark/sql/_types.py and then some magic in
pyspark/sql/__init__.py dynamically renamed the module back to types. I
imagine that this is some naming conflict with Python 3, but what was the
error that showed up?

The reason why I'm asking about this is because it's messing with pylint,
since pylint cannot now statically find the module. I tried also importing
the package so that __init__ would be run in a init-hook, but that isn't
what the discovery mechanism is using. I imagine it's probably just
crawling the directory structure.

One way to work around this would be something akin to this (
http://stackoverflow.com/questions/9602811/how-to-tell-pylint-to-ignore-certain-imports),
where I would have to create a fake module, but I would probably be missing
a ton of pylint features on users of that module, and it's pretty hacky.


Re: [VOTE] Release Apache Spark 1.4.0 (RC1)

2015-05-22 Thread Justin Uang
I'm working on one of the Palantir teams using Spark, and here is our
feedback:

We have encountered three issues when upgrading to spark 1.4.0. I'm not
sure they qualify as a -1, as they come from using non-public APIs and
multiple spark contexts for the purposes of testing, but I do want to bring
them up for awareness =)

   1. Our UDT was serializing to a StringType, but now strings are
   represented internally as UTF8String, so we had to change our UDT to use
   UTF8String.apply() and UTF8String.toString() to convert back to String.
   2. createDataFrame when using UDTs used to accept things in the
   serialized catalyst form. Now, they're supposed to be in the UDT java class
   form (I think this change would've affected us in 1.3.1 already, since we
   were in 1.3.0)
   3. derby database lifecycle management issue with HiveContext. We have
   been using a SparkContextResource JUnit Rule that we wrote, and it sets up
   then tears down a SparkContext and HiveContext between unit test runs
   within the same process (possibly the same thread as well). Multiple
   contexts are not being used at once. It used to work in 1.3.0, but now when
   we try to create the HiveContext for the second unit test, then it
   complains with the following exception. I have a feeling it might have
   something to do with the Hive object being thread local, and us not
   explicitly closing the HiveContext and everything it holds. The full stack
   trace is here: https://gist.github.com/justinuang/0403d49cdeedf91727cd

Caused by: java.sql.SQLException: Failed to start database
'metastore_db' with class loader
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@5dea2446,
see the next exception for details.
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
Source)


On Wed, May 20, 2015 at 10:35 AM Imran Rashid  wrote:

> -1
>
> discovered I accidentally removed master & worker json endpoints, will
> restore
> https://issues.apache.org/jira/browse/SPARK-7760
>
> On Tue, May 19, 2015 at 11:10 AM, Patrick Wendell 
> wrote:
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 1.4.0!
>>
>> The tag to be voted on is v1.4.0-rc1 (commit 777a081):
>>
>> https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=777a08166f1fb144146ba32581d4632c3466541e
>>
>> The release files, including signatures, digests, etc. can be found at:
>> http://people.apache.org/~pwendell/spark-1.4.0-rc1/
>>
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1092/
>>
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-1.4.0-rc1-docs/
>>
>> Please vote on releasing this package as Apache Spark 1.4.0!
>>
>> The vote is open until Friday, May 22, at 17:03 UTC and passes
>> if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 1.4.0
>> [ ] -1 Do not release this package because ...
>>
>> To learn more about Apache Spark, please see
>> http://spark.apache.org/
>>
>> == How can I help test this release? ==
>> If you are a Spark user, you can help us test this release by
>> taking a Spark 1.3 workload and running on this release candidate,
>> then reporting any regressions.
>>
>> == What justifies a -1 vote for this release? ==
>> This vote is happening towards the end of the 1.4 QA period,
>> so -1 votes should only occur for significant regressions from 1.3.1.
>> Bugs already present in 1.3.X, minor regressions, or bugs related
>> to new features will not block this release.
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


UDTs and StringType upgrade issue for Spark 1.4.0

2015-05-22 Thread Justin Uang
We ran into an issue regarding Strings in UDTs when upgrading to Spark
1.4.0-rc. I understand that it's a non-public APIs, so it's expected, but I
just wanted to bring it up for awareness and so we can maybe change the
release notes to mention them =)

Our UDT was serializing to a StringType, but now strings are represented
internally as UTF8String, so we had to change our UDT to use
UTF8String.apply() and UTF8String.toString() to convert back to String.


Re: RDD split into multiple RDDs

2015-05-19 Thread Justin Uang
To do it in one pass, conceptually what you would need to do is to consume
the entire parent iterator and store the values either in memory or on
disk, which is generally something you want to avoid given that the parent
iterator length is unbounded. If you need to start spilling to disk, you
might actually get better performance just from doing multiple passes,
provided that you don't have that many unique keys. In fact, the filter
approach that you mentioned earlier is conceptually the same as the
implementation of randomSplit, where each of the split RDDs has access to
the full parent RDD then does the sample.

In addition, building the map is actually very cheap. Since its lazy, you
only do the filters when you need to iterate across the rdd of a specific
key.
On Wed, Apr 29, 2015 at 9:57 AM Sébastien Soubré-Lanabère <
s.sou...@gmail.com> wrote:

> Hi Juan, Daniel,
>
> thank you for your explanations. Indeed, I don't have a big number of keys,
> at least not enough to stuck the scheduler.
>
> I was using a method quite similar as what you post, Juan, and yes it
> works, but I think this would be more efficient to not call filter on each
> key. So, I was thinking something like :
> - get the iterator of the KV rdd
> - distribute each value into a subset by key and then recreate a rdd from
> this subset
>
> Because spark context parallelize method cannot be used inside a
> transformation, I wonder if I could do it by creating a custom RDD and then
> try to implement something like PairRDDFunctions.lookup method, but
> remplacing Seq[V] of course by a RDD
>
> def lookup(key: K): Seq[V] = {
> self.partitioner match {
>   case Some(p) =>
> val index = p.getPartition(key)
> val process = (it: Iterator[(K, V)]) => {
>   val buf = new ArrayBuffer[V]
>   for (pair <- it if pair._1 == key) {
> buf += pair._2
>   }
>   buf
> } : Seq[V]
> val res = self.context.runJob(self, process, Array(index), false)
> res(0)
>   case None =>
> self.filter(_._1 == key).map(_._2).collect()
> }
>   }
>
>
> 2015-04-29 15:02 GMT+02:00 Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com>:
>
> > Hi Daniel,
> >
> > I understood Sébastien was talking having having a high number of keys, I
> > guess I was prejudiced by my own problem! :) Anyway I don't think you
> need
> > to use disk or a database to generate a RDD per key, you can use filter
> > which I guess would be more efficient because IO is avoided, especially
> if
> > the RDD was cached. For example:
> >
> > // in the spark shell
> > import org.apache.spark.rdd.RDD
> > import org.apache.spark.rdd.RDD._
> > import scala.reflect.ClassTag
> >
> > // generate a map from key to rdd of values
> > def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt:
> > ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = {
> > val keys = pairRDD.keys.distinct.collect
> > (for (k <- keys) yield
> > k -> (pairRDD filter(_._1 == k) values)
> > ) toMap
> > }
> >
> > // simple demo
> > val xs = sc.parallelize(1 to 1000)
> > val ixs = xs map(x => (x % 10, x))
> > val gs = groupByKeyToRDDs(ixs)
> > gs(1).collect
> >
> > Just an idea.
> >
> > Greetings,
> >
> > Juan Rodriguez
> >
> >
> >
> > 2015-04-29 14:20 GMT+02:00 Daniel Darabos <
> > daniel.dara...@lynxanalytics.com>:
> >
> >> Check out http://stackoverflow.com/a/26051042/3318517. It's a nice
> >> method for saving the RDD into separate files by key in a single pass.
> Then
> >> you can read the files into separate RDDs.
> >>
> >> On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá <
> >> juan.rodriguez.hort...@gmail.com> wrote:
> >>
> >>> Hi Sébastien,
> >>>
> >>> I came with a similar problem some time ago, you can see the discussion
> >>> in
> >>> the Spark users mailing list at
> >>>
> >>>
> http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results
> >>> . My experience was that when you create too many RDDs the Spark
> >>> scheduler
> >>> gets stuck, so if you have many keys in the map you are creating you'll
> >>> probably have problems. On the other hand, the latest example I
> proposed
> >>> in
> >>> that mailing thread was a batch job in which we start from a single RDD
> >>> of
> >>> time tagged data, transform the RDD in a list of RDD corresponding to
> >>> generating windows according to the time tag of the records, and then
> >>> apply
> >>> a transformation of RDD to each window RDD, like for example KMeans.run
> >>> of
> >>> MLlib. This is very similar to what you propose.
> >>> So in my humble opinion the approach of generating thousands of RDDs by
> >>> filtering doesn't work, and a new RDD class should be implemented for
> >>> this.
> >>> I have never implemented a custom RDD, but if you want some help I
> would
> >>> be
> >>> happy to join you in this task
> >>>
> >>
> >> Sebastien said nothing about thousands of keys. This is a valid prob

First-class support for pip/virtualenv in pyspark

2015-04-23 Thread Justin Uang
Hi,

I have been trying to figure out how to ship a python package that I have
been working on, and this has brought up a couple questions to me. Please
note that I'm fairly new to python package management, so any
feedback/corrections is welcome =)

It looks like the --py-files support we have merely adds the .py, .zip, or
.egg to the sys.path, and therefore only supports "built" distributions
that only needed to be added to the path. Because of this, it looks like
wheels won't work as well, since they involve an installation process (
https://www.python.org/dev/peps/pep-0427/#is-it-possible-to-import-python-code-directly-from-a-wheel-file
).

In addition, any type of distribution that has shared libraries, such as
pandas and numpy wheels will fail because "ZIP import of dynamic modules
(.pyd, .so) is disallowed" (https://docs.python.org/2/library/zipimport.html
).

The only way to support wheels or other types of source distributions that
require an "installation" step, is to use an installer like pip, in which
case, the natural extension is to use virtualenv. Have we considered having
pyspark manage virtualenvs, and to use pip install to install packages that
are sent across the cluster? I feel like first class support of using pip
install will

- allow us to ship packages that require an install step (numpy, pandas,
etc)
- help users not have to provision the cluster with all the dependencies
- allow multiple applications run with different environments at the same
time
- allow a user just to specify a top level dependency or requirements.txt,
and have pip install all the transitive dependencies automatically

Thanks!

Justin


Infinite recursion when using SQLContext#createDataFrame(JavaRDD[Row], java.util.List[String])

2015-04-19 Thread Justin Uang
Hi,

I have a question regarding SQLContext#createDataFrame(JavaRDD[Row],
java.util.List[String]). It looks like when I try to call it, it results in
an infinite recursion that overflows the stack. I filed it here:
https://issues.apache.org/jira/browse/SPARK-6999.

What is the best way to fix this? Is the intention that it indeed calls a
scala implementation that infers the schema using the datatypes of the Rows
as well as using the provided column names?

Thanks!

Justin