Re: DBSCAN for MLlib

2015-01-16 Thread Muhammad Ali A'råby
Please find my answers on JIRA page.
Muhammad-Ali 

 On Thursday, January 15, 2015 3:25 AM, Xiangrui Meng  
wrote:
   

 Please find my comments on the JRIA page. -Xiangrui

On Tue, Jan 13, 2015 at 1:49 PM, Muhammad Ali A'råby
 wrote:
> I have to say, I have created a Jira task for it:
> [SPARK-5226] Add DBSCAN Clustering Algorithm to MLlib - ASF JIRA
>
> |  |
> |  |  |  |  |  |
> | [SPARK-5226] Add DBSCAN Clustering Algorithm to MLlib - ASF JIRAMLlib is 
> all k-means now, and I think we should add some new clustering algorithms to 
> it. First candidate is DBSCAN as I think.  |
> |  |
> | View on issues.apache.org | Preview by Yahoo |
> |  |
> |  |
>
>
>
>      On Wednesday, January 14, 2015 1:09 AM, Muhammad Ali A'råby 
> wrote:
>
>
>  Dear all,
> I think MLlib needs more clustering algorithms and DBSCAN is my first 
> candidate. I am starting to implement it. Any advice?
> Muhammad-Ali
>
>

   

Re: RDD order guarantees

2015-01-16 Thread Ewan Higgs

Yes, I am running on a local file system.

Is there a bug open for this? Mingyu Kim reported the problem last April:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-reads-partitions-in-a-wrong-order-td4818.html

-Ewan

On 01/16/2015 07:41 PM, Reynold Xin wrote:
You are running on a local file system right? HDFS orders the file 
based on names, but local file system often don't. I think that's why 
the difference.


We might be able to do a sort and order the partitions when we create 
a RDD to make this universal though.


On Fri, Jan 16, 2015 at 8:26 AM, Ewan Higgs > wrote:


Hi all,
Quick one: when reading files, are the orders of partitions
guaranteed to be preserved? I am finding some weird behaviour
where I run sortByKeys() on an RDD (which has 16 byte keys) and
write it to disk. If I open a python shell and run the following:

for part in range(29):
print map(ord,
open('/home/ehiggs/data/terasort_out/part-r-000{0:02}'.format(part),
'r').read(16))

Then each partition is in order based on the first value of each
partition.

I can also call TeraValidate.validate from TeraSort and it is
happy with the results. It seems to be on loading the file that
the reordering happens. If this is expected, is there a way to ask
Spark nicely to give me the RDD in the order it was saved?

This is based on trying to fix my TeraValidate code on this branch:
https://github.com/ehiggs/spark/tree/terasort

Thanks,
Ewan

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org

For additional commands, e-mail: dev-h...@spark.apache.org







Re: Implementing TinkerPop on top of GraphX

2015-01-16 Thread Kushal Datta
code updated. sorry, wrong branch uploaded before.

On Fri, Jan 16, 2015 at 2:13 PM, Kushal Datta 
wrote:

> The source code is under a new module named 'graphx'. let me double check.
>
> On Fri, Jan 16, 2015 at 2:11 PM, Kyle Ellrott 
> wrote:
>
>> Looking at https://github.com/kdatta/tinkerpop3/compare/graphx-gremlin I
>> only see a maven build file. Do you have some source code some place else?
>>
>> I've worked on a spark based implementation (
>> https://github.com/kellrott/spark-gremlin ), but its not done and I've
>> been tied up on other projects.
>> It also look Tinkerpop3 is a bit of a moving target. I had targeted the
>> work done for gremlin-giraph (
>> http://www.tinkerpop.com/docs/3.0.0.M5/#giraph-gremlin ) that was part
>> of the M5 release, as a base model for implementation. But that appears to
>> have been refactored into gremlin-hadoop (
>> http://www.tinkerpop.com/docs/3.0.0.M6/#hadoop-gremlin ) in the M6
>> release. I need to assess how much this changes the code.
>>
>> Most of the code that needs to be changes from Giraph to Spark will be
>> simply replacing classes with spark derived ones. The main place where the
>> logic will need changed is in the 'GraphComputer' class (
>> https://github.com/tinkerpop/tinkerpop3/blob/master/hadoop-gremlin/src/main/java/com/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
>> ) which is created by the Graph when the 'compute' method is called (
>> https://github.com/tinkerpop/tinkerpop3/blob/master/hadoop-gremlin/src/main/java/com/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java#L135
>> ).
>>
>>
>> Kyle
>>
>>
>>
>> On Fri, Jan 16, 2015 at 1:01 PM, Kushal Datta 
>> wrote:
>>
>>> Hi David,
>>>
>>>
>>> Yes, we are still headed in that direction.
>>> Please take a look at the repo I sent earlier.
>>> I think that's a good starting point.
>>>
>>> Thanks,
>>> -Kushal.
>>>
>>> On Thu, Jan 15, 2015 at 8:31 AM, David Robinson 
>>> wrote:
>>>
>>> > I am new to Spark and GraphX, however, I use Tinkerpop backed graphs
>>> and
>>> > think the idea of using Tinkerpop as the API for GraphX is a great
>>> idea and
>>> > hope you are still headed in that direction.  I noticed that Tinkerpop
>>> 3 is
>>> > moving into the Apache family:
>>> > http://wiki.apache.org/incubator/TinkerPopProposal  which might
>>> alleviate
>>> > concerns about having an API definition "outside" of Spark.
>>> >
>>> > Thanks,
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> >
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Implementing-TinkerPop-on-top-of-GraphX-tp9169p10126.html
>>> > Sent from the Apache Spark Developers List mailing list archive at
>>> > Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: dev-h...@spark.apache.org
>>> >
>>> >
>>>
>>
>>
>


Re: Join implementation in SparkSQL

2015-01-16 Thread Yin Huai
Hi Alex,

Can you attach the output of sql("explain extended ").collect.foreach(println)?

Thanks,

Yin

On Fri, Jan 16, 2015 at 1:54 PM, Alessandro Baretta 
wrote:

> Reynold,
>
> The source file you are directing me to is a little too terse for me to
> understand what exactly is going on. Let me tell you what I'm trying to do
> and what problems I'm encountering, so that you might be able to better
> direct me investigation of the SparkSQL codebase.
>
> I am computing the join of three tables, sharing the same primary key,
> composed of three fields, and having several other fields. My first attempt
> at computing this join was in SQL, with a query much like this slightly
> simplified one:
>
>  SELECT
>   a.key1 key1, a.key2 key2, a.key3 key3,
>   a.data1   adata1,a.data2adata2,...
>   b.data1   bdata1,b.data2bdata2,...
>   c.data1   cdata1,c.data2cdata2,...
> FROM a, b, c
> WHERE
>   a.key1 = b.key1 AND a.key2 = b.key2 AND a.key3 = b.key3
>   b.key1 = c.key1 AND b.key2 = c.key2 AND b.key3 = c.key3
>
> This code yielded a SparkSQL job containing 40,000 stages, which failed
> after filling up all available disk space on the worker nodes.
>
> I then wrote this join as a plain mapreduce join. The code looks roughly
> like this:
> val a_ = a.map(row => (key(row), ("a", row))
> val b_ = b.map(row => (key(row), ("b", row))
> val c_ = c.map(row => (key(row), ("c", row"))
> val join = UnionRDD(sc, List(a_, b_, c_)).groupByKey
>
> This implementation yields approximately 1600 stages and completes in a few
> minutes on a 256 core cluster. The huge difference in scale of the two jobs
> makes me think that SparkSQL is implementing my join as cartesian product.
> This is they query plan--I'm not sure I can read it, but it does seem to
> imply that the filter conditions are not being pushed far down enough:
>
>  'Project [...]
>  'Filter (('a.key1 = 'b.key1)) && ('a.key2 = b.key2)) && ...)
>   'Join Inner, None
>'Join Inner, None
>
> Is maybe SparkSQL unable to push join conditions down from the WHERE clause
> into the join itself?
>
> Alex
>
> On Thu, Jan 15, 2015 at 10:36 AM, Reynold Xin  wrote:
>
> > It's a bunch of strategies defined here:
> >
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
> >
> > In most common use cases (e.g. inner equi join), filters are pushed below
> > the join or into the join. Doing a cartesian product followed by a filter
> > is too expensive.
> >
> >
> > On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta <
> alexbare...@gmail.com
> > > wrote:
> >
> >> Hello,
> >>
> >> Where can I find docs about how joins are implemented in SparkSQL? In
> >> particular, I'd like to know whether they are implemented according to
> >> their relational algebra definition as filters on top of a cartesian
> >> product.
> >>
> >> Thanks,
> >>
> >> Alex
> >>
> >
> >
>


Re: Spark SQL API changes and stabilization

2015-01-16 Thread Reynold Xin
That's a good idea. We didn't intentionally break the doc generation. The
doc generation for Catalyst is broken because we use Scala macros and we
haven't had time to investigate how to fix it yet.

If you have a minute and want to investigate, I can merge it in as soon as
possible.





On Fri, Jan 16, 2015 at 2:11 PM, Alessandro Baretta 
wrote:

> Reynold,
>
> Your clarification is much appreciated. One issue though, that I would
> strongly encourage you to work on, is to make sure that the Scaladoc CAN be
> generated manually if needed (a "Use at your own risk" clause would be
> perfectly legitimate here). The reason I say this is that currently even
> hacking SparkBuild.scala to include SparkSQL in the unidoc target doesn't
> help, as scaladoc itself fails with errors such as these.
>
> [error]
> /Users/alex/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala:359:
> polymorphic expression cannot be instantiated to expected type;
> [error]  found   : [T(in method
> apply)]org.apache.spark.sql.catalyst.dsl.ScalaUdfBuilder[T(in method apply)]
> [error]  required:
> org.apache.spark.sql.catalyst.dsl.package.ScalaUdfBuilder[T(in method
> functionToUdfBuilder)]
> [error]   implicit def functionToUdfBuilder[T: TypeTag](func:
> Function22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _,
> _, T]): ScalaUdfBuilder[T] = ScalaUdfBuilder(func)
> [error]
>
>   ^
> [error]
> /Users/alex/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:147:
> value q is not a member of StringContext
> [error]  Note: implicit class Evaluate2 is not applicable here because it
> comes after the application point and it lacks an explicit result type
> [error] q"""
> [error] ^
> [error]
> /Users/alex/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:181:
> value q is not a member of StringContext
> [error] q"""
> [error] ^
> [error]
> /Users/alex/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:198:
> value q is not a member of StringContext
>
> While I understand you desire to discourage users from relying on the
> internal "private" APIs, there is no reason to prevent people from gaining
> a better understanding of how things work by allow them--with some
> effort--to get to the docs.
>
> Thanks,
>
> Alex
>
> On Thu, Jan 15, 2015 at 10:33 AM, Reynold Xin  wrote:
>
>> Alex,
>>
>> I didn't communicate properly. By "private", I simply meant the
>> expectation that it is not a public API. The plan is to still omit it from
>> the scaladoc/javadoc generation, but no language visibility modifier will
>> be applied on them.
>>
>> After 1.3, you will likely no longer need to use things in sql.catalyst
>> package directly. Programmatically construct SchemaRDDs is going to be a
>> first class public API. Data types have already been moved out of the
>> sql.catalyst package and now lives in sql.types. They are becoming stable
>> public APIs. When the "data frame" patch is submitted, you will see a
>> public expression library also. There will be few reason for end users or
>> library developers to hook into things in sql.catalyst. For the bravest and
>> the most advanced, they can still use them, with the expectation that it is
>> subject to change.
>>
>>
>>
>>
>>
>> On Thu, Jan 15, 2015 at 7:53 AM, Alessandro Baretta <
>> alexbare...@gmail.com> wrote:
>>
>>> Reynold,
>>>
>>> Thanks for the heads up. In general, I strongly oppose the use of
>>> "private" to restrict access to certain parts of the API, the reason being
>>> that I might find the need to use some of the internals of a library from
>>> my own project. I find that a @DeveloperAPI annotation serves the same
>>> purpose as "private" without imposing unnecessary restrictions: it
>>> discourages people from using the annotated API and reserves the right for
>>> the core developers to change it suddenly in backwards incompatible ways.
>>>
>>> In particular, I would like to express the desire that the APIs to
>>> programmatically construct SchemaRDDs from an RDD[Row] and a StructType
>>> remain public. All the SparkSQL data type objects should be exposed by the
>>> API, and the jekyll build should not hide the docs as it does now.
>>>
>>> Thanks.
>>>
>>> Alex
>>>
>>> On Wed, Jan 14, 2015 at 9:45 PM, Reynold Xin 
>>> wrote:
>>>
 Hi Spark devs,

 Given the growing number of developers that are building on Spark SQL,
 we
 would like to stabilize the API in 1.3 so users and developers can be
 confident to build on it. This also gives us a chance to improve the
 API.

 In particular, we are proposing the following major changes. This should
 have no impact for most users (i.e. those running SQL through the JDBC
 client or SQLContext.sql method).

 1. Everything in sql.

Re: Implementing TinkerPop on top of GraphX

2015-01-16 Thread Kushal Datta
The source code is under a new module named 'graphx'. let me double check.

On Fri, Jan 16, 2015 at 2:11 PM, Kyle Ellrott  wrote:

> Looking at https://github.com/kdatta/tinkerpop3/compare/graphx-gremlin I
> only see a maven build file. Do you have some source code some place else?
>
> I've worked on a spark based implementation (
> https://github.com/kellrott/spark-gremlin ), but its not done and I've
> been tied up on other projects.
> It also look Tinkerpop3 is a bit of a moving target. I had targeted the
> work done for gremlin-giraph (
> http://www.tinkerpop.com/docs/3.0.0.M5/#giraph-gremlin ) that was part of
> the M5 release, as a base model for implementation. But that appears to
> have been refactored into gremlin-hadoop (
> http://www.tinkerpop.com/docs/3.0.0.M6/#hadoop-gremlin ) in the M6
> release. I need to assess how much this changes the code.
>
> Most of the code that needs to be changes from Giraph to Spark will be
> simply replacing classes with spark derived ones. The main place where the
> logic will need changed is in the 'GraphComputer' class (
> https://github.com/tinkerpop/tinkerpop3/blob/master/hadoop-gremlin/src/main/java/com/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
> ) which is created by the Graph when the 'compute' method is called (
> https://github.com/tinkerpop/tinkerpop3/blob/master/hadoop-gremlin/src/main/java/com/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java#L135
> ).
>
>
> Kyle
>
>
>
> On Fri, Jan 16, 2015 at 1:01 PM, Kushal Datta 
> wrote:
>
>> Hi David,
>>
>>
>> Yes, we are still headed in that direction.
>> Please take a look at the repo I sent earlier.
>> I think that's a good starting point.
>>
>> Thanks,
>> -Kushal.
>>
>> On Thu, Jan 15, 2015 at 8:31 AM, David Robinson 
>> wrote:
>>
>> > I am new to Spark and GraphX, however, I use Tinkerpop backed graphs and
>> > think the idea of using Tinkerpop as the API for GraphX is a great idea
>> and
>> > hope you are still headed in that direction.  I noticed that Tinkerpop
>> 3 is
>> > moving into the Apache family:
>> > http://wiki.apache.org/incubator/TinkerPopProposal  which might
>> alleviate
>> > concerns about having an API definition "outside" of Spark.
>> >
>> > Thanks,
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> >
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Implementing-TinkerPop-on-top-of-GraphX-tp9169p10126.html
>> > Sent from the Apache Spark Developers List mailing list archive at
>> > Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: dev-h...@spark.apache.org
>> >
>> >
>>
>
>


Re: Spark SQL API changes and stabilization

2015-01-16 Thread Alessandro Baretta
Reynold,

Your clarification is much appreciated. One issue though, that I would
strongly encourage you to work on, is to make sure that the Scaladoc CAN be
generated manually if needed (a "Use at your own risk" clause would be
perfectly legitimate here). The reason I say this is that currently even
hacking SparkBuild.scala to include SparkSQL in the unidoc target doesn't
help, as scaladoc itself fails with errors such as these.

[error]
/Users/alex/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala:359:
polymorphic expression cannot be instantiated to expected type;
[error]  found   : [T(in method
apply)]org.apache.spark.sql.catalyst.dsl.ScalaUdfBuilder[T(in method apply)]
[error]  required:
org.apache.spark.sql.catalyst.dsl.package.ScalaUdfBuilder[T(in method
functionToUdfBuilder)]
[error]   implicit def functionToUdfBuilder[T: TypeTag](func: Function22[_,
_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]):
ScalaUdfBuilder[T] = ScalaUdfBuilder(func)
[error]

^
[error]
/Users/alex/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:147:
value q is not a member of StringContext
[error]  Note: implicit class Evaluate2 is not applicable here because it
comes after the application point and it lacks an explicit result type
[error] q"""
[error] ^
[error]
/Users/alex/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:181:
value q is not a member of StringContext
[error] q"""
[error] ^
[error]
/Users/alex/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:198:
value q is not a member of StringContext

While I understand you desire to discourage users from relying on the
internal "private" APIs, there is no reason to prevent people from gaining
a better understanding of how things work by allow them--with some
effort--to get to the docs.

Thanks,

Alex

On Thu, Jan 15, 2015 at 10:33 AM, Reynold Xin  wrote:

> Alex,
>
> I didn't communicate properly. By "private", I simply meant the
> expectation that it is not a public API. The plan is to still omit it from
> the scaladoc/javadoc generation, but no language visibility modifier will
> be applied on them.
>
> After 1.3, you will likely no longer need to use things in sql.catalyst
> package directly. Programmatically construct SchemaRDDs is going to be a
> first class public API. Data types have already been moved out of the
> sql.catalyst package and now lives in sql.types. They are becoming stable
> public APIs. When the "data frame" patch is submitted, you will see a
> public expression library also. There will be few reason for end users or
> library developers to hook into things in sql.catalyst. For the bravest and
> the most advanced, they can still use them, with the expectation that it is
> subject to change.
>
>
>
>
>
> On Thu, Jan 15, 2015 at 7:53 AM, Alessandro Baretta  > wrote:
>
>> Reynold,
>>
>> Thanks for the heads up. In general, I strongly oppose the use of
>> "private" to restrict access to certain parts of the API, the reason being
>> that I might find the need to use some of the internals of a library from
>> my own project. I find that a @DeveloperAPI annotation serves the same
>> purpose as "private" without imposing unnecessary restrictions: it
>> discourages people from using the annotated API and reserves the right for
>> the core developers to change it suddenly in backwards incompatible ways.
>>
>> In particular, I would like to express the desire that the APIs to
>> programmatically construct SchemaRDDs from an RDD[Row] and a StructType
>> remain public. All the SparkSQL data type objects should be exposed by the
>> API, and the jekyll build should not hide the docs as it does now.
>>
>> Thanks.
>>
>> Alex
>>
>> On Wed, Jan 14, 2015 at 9:45 PM, Reynold Xin  wrote:
>>
>>> Hi Spark devs,
>>>
>>> Given the growing number of developers that are building on Spark SQL, we
>>> would like to stabilize the API in 1.3 so users and developers can be
>>> confident to build on it. This also gives us a chance to improve the API.
>>>
>>> In particular, we are proposing the following major changes. This should
>>> have no impact for most users (i.e. those running SQL through the JDBC
>>> client or SQLContext.sql method).
>>>
>>> 1. Everything in sql.catalyst package is private to the project.
>>>
>>> 2. Redesign SchemaRDD DSL (SPARK-5097): We initially added the DSL for
>>> SchemaRDD and logical plans in order to construct test cases. We have
>>> received feedback from a lot of users that the DSL can be incredibly
>>> powerful. In 1.3, we’d like to refactor the DSL to make it suitable for
>>> not
>>> only constructing test cases, but also in everyday data pipelines. The
>>> new
>>> SchemaRDD API is inspired by the data frame concept in Pandas and R.
>>>
>>> 3. Reconcile Java an

Re: Implementing TinkerPop on top of GraphX

2015-01-16 Thread Kyle Ellrott
Looking at https://github.com/kdatta/tinkerpop3/compare/graphx-gremlin I
only see a maven build file. Do you have some source code some place else?

I've worked on a spark based implementation (
https://github.com/kellrott/spark-gremlin ), but its not done and I've been
tied up on other projects.
It also look Tinkerpop3 is a bit of a moving target. I had targeted the
work done for gremlin-giraph (
http://www.tinkerpop.com/docs/3.0.0.M5/#giraph-gremlin ) that was part of
the M5 release, as a base model for implementation. But that appears to
have been refactored into gremlin-hadoop (
http://www.tinkerpop.com/docs/3.0.0.M6/#hadoop-gremlin ) in the M6 release.
I need to assess how much this changes the code.

Most of the code that needs to be changes from Giraph to Spark will be
simply replacing classes with spark derived ones. The main place where the
logic will need changed is in the 'GraphComputer' class (
https://github.com/tinkerpop/tinkerpop3/blob/master/hadoop-gremlin/src/main/java/com/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
) which is created by the Graph when the 'compute' method is called (
https://github.com/tinkerpop/tinkerpop3/blob/master/hadoop-gremlin/src/main/java/com/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java#L135
).


Kyle



On Fri, Jan 16, 2015 at 1:01 PM, Kushal Datta 
wrote:

> Hi David,
>
>
> Yes, we are still headed in that direction.
> Please take a look at the repo I sent earlier.
> I think that's a good starting point.
>
> Thanks,
> -Kushal.
>
> On Thu, Jan 15, 2015 at 8:31 AM, David Robinson 
> wrote:
>
> > I am new to Spark and GraphX, however, I use Tinkerpop backed graphs and
> > think the idea of using Tinkerpop as the API for GraphX is a great idea
> and
> > hope you are still headed in that direction.  I noticed that Tinkerpop 3
> is
> > moving into the Apache family:
> > http://wiki.apache.org/incubator/TinkerPopProposal  which might
> alleviate
> > concerns about having an API definition "outside" of Spark.
> >
> > Thanks,
> >
> >
> >
> >
> > --
> > View this message in context:
> >
> http://apache-spark-developers-list.1001551.n3.nabble.com/Implementing-TinkerPop-on-top-of-GraphX-tp9169p10126.html
> > Sent from the Apache Spark Developers List mailing list archive at
> > Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> > For additional commands, e-mail: dev-h...@spark.apache.org
> >
> >
>


Re: Join implementation in SparkSQL

2015-01-16 Thread Alessandro Baretta
Reynold,

The source file you are directing me to is a little too terse for me to
understand what exactly is going on. Let me tell you what I'm trying to do
and what problems I'm encountering, so that you might be able to better
direct me investigation of the SparkSQL codebase.

I am computing the join of three tables, sharing the same primary key,
composed of three fields, and having several other fields. My first attempt
at computing this join was in SQL, with a query much like this slightly
simplified one:

 SELECT
  a.key1 key1, a.key2 key2, a.key3 key3,
  a.data1   adata1,a.data2adata2,...
  b.data1   bdata1,b.data2bdata2,...
  c.data1   cdata1,c.data2cdata2,...
FROM a, b, c
WHERE
  a.key1 = b.key1 AND a.key2 = b.key2 AND a.key3 = b.key3
  b.key1 = c.key1 AND b.key2 = c.key2 AND b.key3 = c.key3

This code yielded a SparkSQL job containing 40,000 stages, which failed
after filling up all available disk space on the worker nodes.

I then wrote this join as a plain mapreduce join. The code looks roughly
like this:
val a_ = a.map(row => (key(row), ("a", row))
val b_ = b.map(row => (key(row), ("b", row))
val c_ = c.map(row => (key(row), ("c", row"))
val join = UnionRDD(sc, List(a_, b_, c_)).groupByKey

This implementation yields approximately 1600 stages and completes in a few
minutes on a 256 core cluster. The huge difference in scale of the two jobs
makes me think that SparkSQL is implementing my join as cartesian product.
This is they query plan--I'm not sure I can read it, but it does seem to
imply that the filter conditions are not being pushed far down enough:

 'Project [...]
 'Filter (('a.key1 = 'b.key1)) && ('a.key2 = b.key2)) && ...)
  'Join Inner, None
   'Join Inner, None

Is maybe SparkSQL unable to push join conditions down from the WHERE clause
into the join itself?

Alex

On Thu, Jan 15, 2015 at 10:36 AM, Reynold Xin  wrote:

> It's a bunch of strategies defined here:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
>
> In most common use cases (e.g. inner equi join), filters are pushed below
> the join or into the join. Doing a cartesian product followed by a filter
> is too expensive.
>
>
> On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta  > wrote:
>
>> Hello,
>>
>> Where can I find docs about how joins are implemented in SparkSQL? In
>> particular, I'd like to know whether they are implemented according to
>> their relational algebra definition as filters on top of a cartesian
>> product.
>>
>> Thanks,
>>
>> Alex
>>
>
>


Re: Implementing TinkerPop on top of GraphX

2015-01-16 Thread Kushal Datta
Hi David,


Yes, we are still headed in that direction.
Please take a look at the repo I sent earlier.
I think that's a good starting point.

Thanks,
-Kushal.

On Thu, Jan 15, 2015 at 8:31 AM, David Robinson 
wrote:

> I am new to Spark and GraphX, however, I use Tinkerpop backed graphs and
> think the idea of using Tinkerpop as the API for GraphX is a great idea and
> hope you are still headed in that direction.  I noticed that Tinkerpop 3 is
> moving into the Apache family:
> http://wiki.apache.org/incubator/TinkerPopProposal  which might alleviate
> concerns about having an API definition "outside" of Spark.
>
> Thanks,
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Implementing-TinkerPop-on-top-of-GraphX-tp9169p10126.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Spectral clustering

2015-01-16 Thread Andrew Musselman
Hi, thinking of picking up this Jira ticket:
https://issues.apache.org/jira/browse/SPARK-4259

Anyone done any work on this to date?  Any thoughts on it before we go too
far in?

Thanks!

Best
Andrew


Spark

2015-01-16 Thread Andrew Musselman



Re: Optimize encoding/decoding strings when using Parquet

2015-01-16 Thread Michael Armbrust
+1 to adding such an optimization to parquet.  The bytes are tagged
specially as UTF8 in the parquet schema so it seem like it would be
possible to add this.

On Fri, Jan 16, 2015 at 8:17 AM, Mick Davies 
wrote:

> Hi,
>
> It seems that a reasonably large proportion of query time using Spark SQL
> seems to be spent decoding Parquet Binary objects to produce Java Strings.
> Has anyone considered trying to optimize these conversions as many are
> duplicated.
>
> Details are outlined in the conversation in the user mailing list
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-amp-Parquet-data-are-reading-very-very-slow-td21061.html
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-amp-Parquet-data-are-reading-very-very-slow-td21061.html
> >
> , I have copied a bit of that discussion here.
>
> It seems that as Spark processes each row from Parquet it makes a call to
> convert the Binary representation for each String column into a Java
> String.
> However in many (probably most) circumstances the underlying Binary
> instance
> from Parquet will have come from a Dictionary, for example when column
> cardinality is low. Therefore Spark is converting the same byte array to a
> copy of the same Java String over and over again. This is bad due to extra
> cpu, extra memory used for these strings, and probably results in more
> expensive grouping comparisons.
>
>
> I tested a simple hack to cache the last Binary->String conversion per
> column in ParquetConverter and this led to a 25% performance improvement
> for
> the queries I used. Admittedly this was over a data set with lots or runs
> of
> the same Strings in the queried columns.
>
> These costs are quite significant for the type of data that I expect will
> be
> stored in Parquet which will often have denormalized tables and probably
> lots of fairly low cardinality string columns
>
> I think a good way to optimize this would be if changes could be made to
> Parquet so that  the encoding/decoding of Objects to Binary is handled on
> Parquet side of fence. Parquet could deal with Objects (Strings) as the
> client understands them and only use encoding/decoding to store/read from
> underlying storage medium. Doing this I think Parquet could ensure that the
> encoding/decoding of each Object occurs only once.
>
> Does anyone have an opinion on this, has it been considered already?
>
> Cheers Mick
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Optimize-encoding-decoding-strings-when-using-Parquet-tp10141.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: RDD order guarantees

2015-01-16 Thread Reynold Xin
You are running on a local file system right? HDFS orders the file based on
names, but local file system often don't. I think that's why the difference.

We might be able to do a sort and order the partitions when we create a RDD
to make this universal though.

On Fri, Jan 16, 2015 at 8:26 AM, Ewan Higgs  wrote:

> Hi all,
> Quick one: when reading files, are the orders of partitions guaranteed to
> be preserved? I am finding some weird behaviour where I run sortByKeys() on
> an RDD (which has 16 byte keys) and write it to disk. If I open a python
> shell and run the following:
>
> for part in range(29):
> print map(ord, 
> open('/home/ehiggs/data/terasort_out/part-r-000{0:02}'.format(part),
> 'r').read(16))
>
> Then each partition is in order based on the first value of each partition.
>
> I can also call TeraValidate.validate from TeraSort and it is happy with
> the results. It seems to be on loading the file that the reordering
> happens. If this is expected, is there a way to ask Spark nicely to give me
> the RDD in the order it was saved?
>
> This is based on trying to fix my TeraValidate code on this branch:
> https://github.com/ehiggs/spark/tree/terasort
>
> Thanks,
> Ewan
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: Setting JVM options to Spark executors in Standalone mode

2015-01-16 Thread Marcelo Vanzin
On Fri, Jan 16, 2015 at 10:07 AM, Michel Dufresne
 wrote:
> Thank for your reply, I've should have mentioned that spark-env.sh is the
> only option i found because:
>
>- I'm creating the SpeakConf/SparkContext from a Play Application
>(therefore I'm not using spark-submit script)

Then you can set that configuration Zhan mentions directly in your
SparkConf object.

BTW the env variable for what you want is SPARK_EXECUTOR_OPTS, but the
use of env variables to set app configuration is discouraged.


-- 
Marcelo

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Setting JVM options to Spark executors in Standalone mode

2015-01-16 Thread Michel Dufresne
Thank for your reply, I've should have mentioned that spark-env.sh is the
only option i found because:

   - I'm passing the public IP address of the slave (which is determined in
   the shell script)
   - I'm creating the SpeakConf/SparkContext from a Play Application
   (therefore I'm not using spark-submit script)

Thanks

On Fri, Jan 16, 2015 at 1:02 PM, Zhan Zhang  wrote:

> You can try to add it in in conf/spark-defaults.conf
>
>  # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
> -Dnumbers="one two three”
>
> Thanks.
>
> Zhan Zhang
>
> On Jan 16, 2015, at 9:56 AM, Michel Dufresne <
> sparkhealthanalyt...@gmail.com> wrote:
>
> > Hi All,
> >
> > I'm trying to set some JVM options to the executor processes in a
> > standalone cluster. Here's what I have in *spark-env.sh*:
> >
> > jmx_opt="-Dcom.sun.management.jmxremote"
> >> jmx_opt="${jmx_opt} -Djava.net.preferIPv4Stack=true"
> >> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.port="
> >> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.rmi.port=9998"
> >> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.ssl=false"
> >> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.authenticate=false"
> >> jmx_opt="${jmx_opt} -Djava.rmi.server.hostname=${SPARK_PUBLIC_DNS}"
> >> export SPARK_WORKER_OPTS="${jmx_opt}"
> >
> >
> > However the option are showing up on the *daemon* JVM not the *workers*.
> It
> > has the same effect as if I was using SPARK_DAEMON_JAVA_OPTS (which
> should
> > set it on the daemon process).
> >
> > Thanks in advance for your help,
> >
> > Michel
>
>
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>


Re: Setting JVM options to Spark executors in Standalone mode

2015-01-16 Thread Zhan Zhang
You can try to add it in in conf/spark-defaults.conf

 # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
-Dnumbers="one two three”

Thanks.

Zhan Zhang

On Jan 16, 2015, at 9:56 AM, Michel Dufresne  
wrote:

> Hi All,
> 
> I'm trying to set some JVM options to the executor processes in a
> standalone cluster. Here's what I have in *spark-env.sh*:
> 
> jmx_opt="-Dcom.sun.management.jmxremote"
>> jmx_opt="${jmx_opt} -Djava.net.preferIPv4Stack=true"
>> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.port="
>> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.rmi.port=9998"
>> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.ssl=false"
>> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.authenticate=false"
>> jmx_opt="${jmx_opt} -Djava.rmi.server.hostname=${SPARK_PUBLIC_DNS}"
>> export SPARK_WORKER_OPTS="${jmx_opt}"
> 
> 
> However the option are showing up on the *daemon* JVM not the *workers*. It
> has the same effect as if I was using SPARK_DAEMON_JAVA_OPTS (which should
> set it on the daemon process).
> 
> Thanks in advance for your help,
> 
> Michel


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Setting JVM options to Spark executors in Standalone mode

2015-01-16 Thread Michel Dufresne
Hi All,

I'm trying to set some JVM options to the executor processes in a
standalone cluster. Here's what I have in *spark-env.sh*:

jmx_opt="-Dcom.sun.management.jmxremote"
> jmx_opt="${jmx_opt} -Djava.net.preferIPv4Stack=true"
> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.port="
> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.rmi.port=9998"
> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.ssl=false"
> jmx_opt="${jmx_opt} -Dcom.sun.management.jmxremote.authenticate=false"
> jmx_opt="${jmx_opt} -Djava.rmi.server.hostname=${SPARK_PUBLIC_DNS}"
> export SPARK_WORKER_OPTS="${jmx_opt}"


However the option are showing up on the *daemon* JVM not the *workers*. It
has the same effect as if I was using SPARK_DAEMON_JAVA_OPTS (which should
set it on the daemon process).

Thanks in advance for your help,

Michel


RDD order guarantees

2015-01-16 Thread Ewan Higgs

Hi all,
Quick one: when reading files, are the orders of partitions guaranteed 
to be preserved? I am finding some weird behaviour where I run 
sortByKeys() on an RDD (which has 16 byte keys) and write it to disk. If 
I open a python shell and run the following:


for part in range(29):
print map(ord, 
open('/home/ehiggs/data/terasort_out/part-r-000{0:02}'.format(part), 
'r').read(16))


Then each partition is in order based on the first value of each partition.

I can also call TeraValidate.validate from TeraSort and it is happy with 
the results. It seems to be on loading the file that the reordering 
happens. If this is expected, is there a way to ask Spark nicely to give 
me the RDD in the order it was saved?


This is based on trying to fix my TeraValidate code on this branch:
https://github.com/ehiggs/spark/tree/terasort

Thanks,
Ewan

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Optimize encoding/decoding strings when using Parquet

2015-01-16 Thread Mick Davies
Hi, 

It seems that a reasonably large proportion of query time using Spark SQL
seems to be spent decoding Parquet Binary objects to produce Java Strings.
Has anyone considered trying to optimize these conversions as many are
duplicated.

Details are outlined in the conversation in the user mailing list 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-amp-Parquet-data-are-reading-very-very-slow-td21061.html

 
, I have copied a bit of that discussion here.

It seems that as Spark processes each row from Parquet it makes a call to
convert the Binary representation for each String column into a Java String.
However in many (probably most) circumstances the underlying Binary instance
from Parquet will have come from a Dictionary, for example when column
cardinality is low. Therefore Spark is converting the same byte array to a
copy of the same Java String over and over again. This is bad due to extra
cpu, extra memory used for these strings, and probably results in more
expensive grouping comparisons. 


I tested a simple hack to cache the last Binary->String conversion per
column in ParquetConverter and this led to a 25% performance improvement for
the queries I used. Admittedly this was over a data set with lots or runs of
the same Strings in the queried columns. 

These costs are quite significant for the type of data that I expect will be
stored in Parquet which will often have denormalized tables and probably
lots of fairly low cardinality string columns 

I think a good way to optimize this would be if changes could be made to
Parquet so that  the encoding/decoding of Objects to Binary is handled on
Parquet side of fence. Parquet could deal with Objects (Strings) as the
client understands them and only use encoding/decoding to store/read from
underlying storage medium. Doing this I think Parquet could ensure that the
encoding/decoding of each Object occurs only once. 

Does anyone have an opinion on this, has it been considered already?

Cheers Mick







--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Optimize-encoding-decoding-strings-when-using-Parquet-tp10141.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Fwd: LinearRegressionWithSGD accuracy

2015-01-16 Thread Robin East


Sent from my iPhone

Begin forwarded message:

> From: Robin East 
> Date: 16 January 2015 11:35:23 GMT
> To: Joseph Bradley 
> Cc: Yana Kadiyska , Devl Devel 
> 
> Subject: Re: LinearRegressionWithSGD accuracy
> 
> Yes with scaled data intercept would be 5000 but the code as it stands is 
> running a model where intercept will be 0.00. You need to call 
> setIntercept(true) to include the intercept in the model.
> 
> Robin
> 
> Sent from my iPhone
> 
>> On 16 Jan 2015, at 02:01, Joseph Bradley  wrote:
>> 
>> Good point about using the intercept.  When scaling uses the mean (shifting 
>> the feature values), then the "true" model now has an intercept of 5000.5, 
>> whereas the original data's "true" model has an intercept of 0.  I think 
>> that's the issue.
>> 
>>> On Thu, Jan 15, 2015 at 5:16 PM, Yana Kadiyska  
>>> wrote:
>>> I can actually reproduce his MSE -- with the scaled data only (non-scaled 
>>> works out just fine)
>>> 
>>> import org.apache.spark.mllib.regression._
>>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>> 
>>> val t=(1 to 1).map(x=>(x,x))
>>> val rdd = sc.parallelize(t)
>>> val parsedData =  
>>> rdd.map(q=>LabeledPoint(q._1.toDouble,Vectors.dense(q._2.toDouble))
>>> 
>>> val lr = new LinearRegressionWithSGD()
>>> lr.optimizer.setStepSize(0.0001)
>>> lr.optimizer.setNumIterations(100)
>>> 
>>> val scaledData = parsedData.map(x => LabeledPoint(x.label, 
>>> scaler.transform(Vectors.dense(x.features.toArray
>>> val model = lr.run(scaledData)
>>> 
>>> val valuesAndPreds = scaledData.map { point =>
>>>   val prediction = model.predict(point.features)
>>>   (prediction,point.label)
>>> }
>>> val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
>>> Last few lines read as:
>>> 
>>> 15/01/15 16:16:40 INFO GradientDescent: GradientDescent.runMiniBatchSGD 
>>> finished. Last 10 stochastic losses 3.3338313007386144E7, 
>>> 3.333831299679853E7, 3.333831298621632E7, 3.333831297563938E7, 
>>> 3.3338312965067785E7, 3.3338312954501465E7, 3.333831294394051E7, 
>>> 3.3338312933384743E7, 3.33383129228344E7, 3.3338312912289333E7
>>> 15/01/15 16:16:40 WARN LinearRegressionWithSGD: The input data was not 
>>> directly cached, which may hurt performance if its parent RDDs are also 
>>> uncached.
>>> model: org.apache.spark.mllib.regression.LinearRegressionModel = 
>>> (weights=[0.00356790226811], intercept=0.0)
>>> 
>>> So I am a bit puzzled as I was under the impression that a scaled model 
>>> would only converge faster. Non-scaled version produced near perfect 
>>> results at alpha=0.0001,numIterations=100
>>> 
>>> According to R the weights should be a lot higher:
>>> y=seq(1, 1)
>>> X=scale(a, center = TRUE, scale = TRUE)
>>> dt=data.frame(y,X)
>>> names(dt) = c("y","x")
>>> model= lm(y~x,data=dt)
>>> #intercept:5000.5,2886.896
>>> new <- data.frame(x=dt$x)
>>> preds = predict(model,new)
>>> mean( (preds-dt$y)^2 , na.rm = TRUE )
>>> Coefficients:
>>> (Intercept)x  
>>>5000.5,  2886.896
>>> 
>>> I did have success with the following model and scaled features as shown in 
>>> the original code block:
>>> 
>>> val lr = new LinearRegressionWithSGD().setIntercept(true)
>>> lr.optimizer.setStepSize(0.1)
>>> lr.optimizer.setNumIterations(1000)
>>> 
>>> scala> model
>>> res12: org.apache.spark.mllib.regression.LinearRegressionModel = 
>>> (weights=[2886.885094323781], intercept=5000.48169121784)
>>> MSE: Double = 4.472548743491049E-4
>>> 
>>> Not sure that it's a question for the dev list as much as someone who 
>>> understands ML well -- I'd appreciate if you guys have any insight on why 
>>> the small alpha/numIters did so poorly on the scaled data (I've removed the 
>>> dev list)
>>> 
>>> 
>>> 
>>> 
 On Thu, Jan 15, 2015 at 3:23 PM, Joseph Bradley  
 wrote:
>>> 
 It looks like you're training on the non-scaled data but testing on the
 scaled data.  Have you tried this training & testing on only the scaled
 data?
 
 On Thu, Jan 15, 2015 at 10:42 AM, Devl Devel 
 wrote:
 
 > Thanks, that helps a bit at least with the NaN but the MSE is still very
 > high even with that step size and 10k iterations:
 >
 > training Mean Squared Error = 3.3322561285919316E7
 >
 > Does this method need say 100k iterations?
 >
 >
 >
 >
 >
 >
 > On Thu, Jan 15, 2015 at 5:42 PM, Robin East 
 > wrote:
 >
 > > -dev, +user
 > >
 > > You’ll need to set the gradient descent step size to something small - 
 > > a
 > > bit of trial and error shows that 0.0001 works.
 > >
 > > You’ll need to create a LinearRegressionWithSGD instance and set the 
 > > step
 > > size explicitly:
 > >
 > > val lr = new LinearRegressionWithSGD()
 > > lr.optimizer.setStepSize(0.0001)
 > > lr.optimizer.setNumIterations(100)
 > > val model = lr.run(parsedData)
 > >
 > > On 15 Jan 2015, at 16:46, devl.developmen