Re: spark-submit on YARN is slow

2014-12-06 Thread Sandy Ryza
Great to hear!

-Sandy

On Fri, Dec 5, 2014 at 11:17 PM, Denny Lee  wrote:

> Okay, my bad for not testing out the documented arguments - once i use the
> correct ones, the query shrinks completes in ~55s (I can probably make it
> faster).   Thanks for the help, eh?!
>
>
>
> On Fri Dec 05 2014 at 10:34:50 PM Denny Lee  wrote:
>
>> Sorry for the delay in my response - for my spark calls for stand-alone
>> and YARN, I am using the --executor-memory and --total-executor-cores for
>> the submission.  In standalone, my baseline query completes in ~40s while
>> in YARN, it completes in ~1800s.  It does not appear from the RM web UI
>> that its asking for more resources than available but by the same token, it
>> appears that its only using a small amount of cores and available memory.
>>
>> Saying this, let me re-try using the --executor-cores,
>> --executor-memory, and --num-executors arguments as suggested (and
>> documented) vs. the --total-executor-cores
>>
>>
>> On Fri Dec 05 2014 at 1:14:53 PM Andrew Or  wrote:
>>
>>> Hey Arun I've seen that behavior before. It happens when the cluster
>>> doesn't have enough resources to offer and the RM hasn't given us our
>>> containers yet. Can you check the RM Web UI at port 8088 to see whether
>>> your application is requesting more resources than the cluster has to offer?
>>>
>>> 2014-12-05 12:51 GMT-08:00 Sandy Ryza :
>>>
>>> Hey Arun,

 The sleeps would only cause maximum like 5 second overhead.  The idea
 was to give executors some time to register.  On more recent versions, they
 were replaced with the spark.scheduler.minRegisteredResourcesRatio and
 spark.scheduler.maxRegisteredResourcesWaitingTime.  As of 1.1, by
 default YARN will wait until either 30 seconds have passed or 80% of the
 requested executors have registered.

 -Sandy

 On Fri, Dec 5, 2014 at 12:46 PM, Ashish Rangole 
 wrote:

> Likely this not the case here yet one thing to point out with Yarn
> parameters like --num-executors is that they should be specified *before*
> app jar and app args on spark-submit command line otherwise the app only
> gets the default number of containers which is 2.
> On Dec 5, 2014 12:22 PM, "Sandy Ryza"  wrote:
>
>> Hi Denny,
>>
>> Those sleeps were only at startup, so if jobs are taking
>> significantly longer on YARN, that should be a different problem.  When 
>> you
>> ran on YARN, did you use the --executor-cores, --executor-memory, and
>> --num-executors arguments?  When running against a standalone cluster, by
>> default Spark will make use of all the cluster resources, but when 
>> running
>> against YARN, Spark defaults to a couple tiny executors.
>>
>> -Sandy
>>
>> On Fri, Dec 5, 2014 at 11:32 AM, Denny Lee 
>> wrote:
>>
>>> My submissions of Spark on YARN (CDH 5.2) resulted in a few thousand
>>> steps. If I was running this on standalone cluster mode the query 
>>> finished
>>> in 55s but on YARN, the query was still running 30min later. Would the 
>>> hard
>>> coded sleeps potentially be in play here?
>>> On Fri, Dec 5, 2014 at 11:23 Sandy Ryza 
>>> wrote:
>>>
 Hi Tobias,

 What version are you using?  In some recent versions, we had a
 couple of large hardcoded sleeps on the Spark side.

 -Sandy

 On Fri, Dec 5, 2014 at 11:15 AM, Andrew Or 
 wrote:

> Hey Tobias,
>
> As you suspect, the reason why it's slow is because the resource
> manager in YARN takes a while to grant resources. This is because YARN
> needs to first set up the application master container, and then this 
> AM
> needs to request more containers for Spark executors. I think this 
> accounts
> for most of the overhead. The remaining source probably comes from 
> how our
> own YARN integration code polls application (every second) and cluster
> resource states (every 5 seconds IIRC). I haven't explored in detail
> whether there are optimizations there that can speed this up, but I 
> believe
> most of the overhead comes from YARN itself.
>
> In other words, no I don't know of any quick fix on your end that
> you can do to speed this up.
>
> -Andrew
>
>
> 2014-12-03 20:10 GMT-08:00 Tobias Pfeiffer :
>
> Hi,
>>
>> I am using spark-submit to submit my application to YARN in
>> "yarn-cluster" mode. I have both the Spark assembly jar file as well 
>> as my
>> application jar file put in HDFS and can see from the logging output 
>> that
>> both files are used from there. However, it still takes about 10 
>> seconds
>> for my application's yarnAppState to switch from ACCEPTED to

RE: Java RDD Union

2014-12-06 Thread Ron Ayoub
With that said, and the nature of iterative algorithms that Spark is advertised 
for, isn't this a bit of an unnecessary restriction since I don't see where the 
problem is. For instance, it is clear that when aggregating you need operations 
to be associative because of the way they are divided and combined. But since 
forEach works on an individual item the same problem doesn't exist. 
As an example, during a k-means algorithm you have to continually update 
cluster assignments per data item along with perhaps distance from centroid.  
So if you can't update items in place you have to literally create thousands 
upon thousands of RDDs. Does Spark have some kind of trick like reuse behind 
the scenes - fully persistent data objects or whatever. How can it possibly be 
efficient for 'iterative' algorithms when it is creating so many RDDs as 
opposed to one? 

> From: so...@cloudera.com
> Date: Fri, 5 Dec 2014 14:58:37 -0600
> Subject: Re: Java RDD Union
> To: ronalday...@live.com; user@spark.apache.org
> 
> foreach also creates a new RDD, and does not modify an existing RDD.
> However, in practice, nothing stops you from fiddling with the Java
> objects inside an RDD when you get a reference to them in a method
> like this. This is definitely a bad idea, as there is certainly no
> guarantee that any other operations will see any, some or all of these
> edits.
> 
> On Fri, Dec 5, 2014 at 2:40 PM, Ron Ayoub  wrote:
> > I tricked myself into thinking it was uniting things correctly. I see I'm
> > wrong now.
> >
> > I have a question regarding your comment that RDD are immutable. Can you
> > change values in an RDD using forEach. Does that violate immutability. I've
> > been using forEach to modify RDD but perhaps I've tricked myself once again
> > into believing it is working. I have object reference so perhaps it is
> > working serendipitously in local mode since the references are in fact not
> > changing but there are referents are and somehow this will no longer work
> > when clustering.
> >
> > Thanks for comments.
> >
> >> From: so...@cloudera.com
> >> Date: Fri, 5 Dec 2014 14:22:38 -0600
> >> Subject: Re: Java RDD Union
> >> To: ronalday...@live.com
> >> CC: user@spark.apache.org
> >
> >>
> >> No, RDDs are immutable. union() creates a new RDD, and does not modify
> >> an existing RDD. Maybe this obviates the question. I'm not sure what
> >> you mean about releasing from memory. If you want to repartition the
> >> unioned RDD, you repartition the result of union(), not anything else.
> >>
> >> On Fri, Dec 5, 2014 at 1:27 PM, Ron Ayoub  wrote:
> >> > I'm a bit confused regarding expected behavior of unions. I'm running on
> >> > 8
> >> > cores. I have an RDD that is used to collect cluster associations
> >> > (cluster
> >> > id, content id, distance) for internal clusters as well as leaf clusters
> >> > since I'm doing hierarchical k-means and need all distances for sorting
> >> > documents appropriately upon examination.
> >> >
> >> > It appears that Union simply adds items in the argument to the RDD
> >> > instance
> >> > the method is called on rather than just returning a new RDD. If I want
> >> > to
> >> > do Union this was as more of an add/append should I be capturing the
> >> > return
> >> > value and releasing it from memory. Need help clarifying the semantics
> >> > here.
> >> >
> >> > Also, in another related thread someone mentioned coalesce after union.
> >> > Would I need to do the same on the instance RDD I'm calling Union on.
> >> >
> >> > Perhaps a method such as append would be useful and clearer.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Re: Java RDD Union

2014-12-06 Thread Sean Owen
I guess a major problem with this is that you lose fault tolerance.
You have no way of recreating the local state of the mutable RDD if a
partition is lost.

Why would you need thousands of RDDs for kmeans? it's a few per iteration.

An RDD is more bookkeeping that data structure, itself. They don't
inherently take up resource, unless you mark them to be persisted.
You're paying the cost of copying objects to create one RDD from next,
but that's mostly it.

On Sat, Dec 6, 2014 at 6:28 AM, Ron Ayoub  wrote:
> With that said, and the nature of iterative algorithms that Spark is
> advertised for, isn't this a bit of an unnecessary restriction since I don't
> see where the problem is. For instance, it is clear that when aggregating
> you need operations to be associative because of the way they are divided
> and combined. But since forEach works on an individual item the same problem
> doesn't exist.
>
> As an example, during a k-means algorithm you have to continually update
> cluster assignments per data item along with perhaps distance from centroid.
> So if you can't update items in place you have to literally create thousands
> upon thousands of RDDs. Does Spark have some kind of trick like reuse behind
> the scenes - fully persistent data objects or whatever. How can it possibly
> be efficient for 'iterative' algorithms when it is creating so many RDDs as
> opposed to one?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Modifying an RDD in forEach

2014-12-06 Thread Ron Ayoub
This is from a separate thread with a differently named title. 
Why can't you modify the actual contents of an RDD using forEach? It appears to 
be working for me. What I'm doing is changing cluster assignments and distances 
per data item for each iteration of the clustering algorithm. The clustering 
algorithm is massive and iterates thousands of times. As I understand it now, 
you are supposed to create new RDDs on each pass. This is a hierachical k-means 
that I'm doing and hence it is consist of many iterations rather than large 
iterations.
So I understand the restriction of why operation when aggregating and reducing 
etc, need to be associative. However, forEach operates on a single item. So 
being that Spark is advertised as being great for iterative algorithms since it 
operates in-memory, how can it be good to create thousands upon thousands of 
RDDs during the course of an iterative algorithm?  Does Spark have some kind of 
trick like reuse behind the scenes - fully persistent data objects or whatever? 
How can it possibly be efficient for 'iterative' algorithms when it is creating 
so many RDDs as opposed to one? 
Or is the answer that I should keep doing what I'm doing because it is working 
even though it is not theoretically sound and aligned with functional ideas. I 
personally just want it to be fast and be able to operate on up to 500 million 
data items.  

RE: Java RDD Union

2014-12-06 Thread Ron Ayoub
Hiearchical K-means require a massive amount of iterations whereas flat K-means 
does not but I've found flat to be generally useless since in most UIs it is 
nice to be able to drill down into more and more specific clusters. If you have 
100 million documents and your branching factor is 8 (8-secting k-means) then 
you will be picking a cluster to split and iterating thousands of times. So per 
split you iterate maybe 6 or 7 times to get new cluster assignments and there 
are ultimately going to be 5,000 to 50,000 splits depending on split criterion 
and cluster variances etc... 
In this case fault tolerance doesn't matter. I've found that the distributed 
aspect of RDD is what I'm looking for and don't care or need the resilience 
part as much. It is a one off algorithm and that can just be run again if 
something goes wrong. Once the data is created it is done with Spark. 
But anyway, that is the very thing Spark is advertised for. 

> From: so...@cloudera.com
> Date: Sat, 6 Dec 2014 06:39:10 -0600
> Subject: Re: Java RDD Union
> To: ronalday...@live.com
> CC: user@spark.apache.org
> 
> I guess a major problem with this is that you lose fault tolerance.
> You have no way of recreating the local state of the mutable RDD if a
> partition is lost.
> 
> Why would you need thousands of RDDs for kmeans? it's a few per iteration.
> 
> An RDD is more bookkeeping that data structure, itself. They don't
> inherently take up resource, unless you mark them to be persisted.
> You're paying the cost of copying objects to create one RDD from next,
> but that's mostly it.
> 
> On Sat, Dec 6, 2014 at 6:28 AM, Ron Ayoub  wrote:
> > With that said, and the nature of iterative algorithms that Spark is
> > advertised for, isn't this a bit of an unnecessary restriction since I don't
> > see where the problem is. For instance, it is clear that when aggregating
> > you need operations to be associative because of the way they are divided
> > and combined. But since forEach works on an individual item the same problem
> > doesn't exist.
> >
> > As an example, during a k-means algorithm you have to continually update
> > cluster assignments per data item along with perhaps distance from centroid.
> > So if you can't update items in place you have to literally create thousands
> > upon thousands of RDDs. Does Spark have some kind of trick like reuse behind
> > the scenes - fully persistent data objects or whatever. How can it possibly
> > be efficient for 'iterative' algorithms when it is creating so many RDDs as
> > opposed to one?
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Re: Modifying an RDD in forEach

2014-12-06 Thread Mayur Rustagi
You'll benefit by viewing Matei's talk in Yahoo on Spark internals and how
it optimizes execution of iterative jobs.
Simple answer is
1. Spark doesn't materialize RDD when you do an iteration but lazily
captures the transformation functions in RDD.(only function and closure ,
no data operation actually happens)
2. When you finally execute and want to cause effects (save to disk ,
collect on master etc) it views the DAG of execution and optimizes what it
can reason (eliminating intermediate states , performing multiple
Transformations in one tasks, leveraging partitioning where available among
others)
Bottom line it doesn't matter how many RDD you have in your DAG chain as
long as Spark can optimize the functions in that DAG to create minimal
materialization on its way to final output.

Regards
Mayur
 On 06-Dec-2014 6:12 pm, "Ron Ayoub"  wrote:

> This is from a separate thread with a differently named title.
>
> Why can't you modify the actual contents of an RDD using forEach? It
> appears to be working for me. What I'm doing is changing cluster
> assignments and distances per data item for each iteration of the
> clustering algorithm. The clustering algorithm is massive and iterates
> thousands of times. As I understand it now, you are supposed to create new
> RDDs on each pass. This is a hierachical k-means that I'm doing and hence
> it is consist of many iterations rather than large iterations.
>
> So I understand the restriction of why operation when aggregating and
> reducing etc, need to be associative. However, forEach operates on a single
> item. So being that Spark is advertised as being great for iterative
> algorithms since it operates in-memory, how can it be good to create
> thousands upon thousands of RDDs during the course of an iterative
> algorithm?  Does Spark have some kind of trick like reuse behind the
> scenes - fully persistent data objects or whatever? How can it possibly be
> efficient for 'iterative' algorithms when it is creating so many RDDs as
> opposed to one?
>
> Or is the answer that I should keep doing what I'm doing because it is
> working even though it is not theoretically sound and aligned with
> functional ideas. I personally just want it to be fast and be able to
> operate on up to 500 million data items.
>


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-06 Thread Jianshi Huang
Very interesting, the line doing drop table will throws an exception. After
removing it all works.

Jianshi

On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang 
wrote:

> Here's the solution I got after talking with Liancheng:
>
> 1) using backquote `..` to wrap up all illegal characters
>
> val rdd = parquetFile(file)
> val schema = rdd.schema.fields.map(f => s"`${f.name}`
> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>
> val ddl_13 = s"""
>   |CREATE EXTERNAL TABLE $name (
>   |  $schema
>   |)
>   |STORED AS PARQUET
>   |LOCATION '$file'
>   """.stripMargin
>
> sql(ddl_13)
>
> 2) create a new Schema and do applySchema to generate a new SchemaRDD, had
> to drop and register table
>
> val t = table(name)
> val newSchema = StructType(t.schema.fields.map(s => s.copy(name =
> s.name.replaceAll(".*?::", ""
> sql(s"drop table $name")
> applySchema(t, newSchema).registerTempTable(name)
>
> I'm testing it for now.
>
> Thanks for the help!
>
>
> Jianshi
>
> On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang 
> wrote:
>
>> Hi,
>>
>> I had to use Pig for some preprocessing and to generate Parquet files for
>> Spark to consume.
>>
>> However, due to Pig's limitation, the generated schema contains Pig's
>> identifier
>>
>> e.g.
>> sorted::id, sorted::cre_ts, ...
>>
>> I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.
>>
>>   create external table pmt (
>> sorted::id bigint
>>   )
>>   stored as parquet
>>   location '...'
>>
>> Obviously it didn't work, I also tried removing the identifier sorted::,
>> but the resulting rows contain only nulls.
>>
>> Any idea how to create a table in HiveContext from these Parquet files?
>>
>> Thanks,
>> Jianshi
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Including data nucleus tools

2014-12-06 Thread spark.dubovsky.jakub
Hi again,

I have tried to recompile and run this again with new assembly created by

./make-distribution.sh -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.1.3 -Pyarn -
Phive -DskipTests

It results in exactly the same error. Any other hints?
Bonus question: Should the class org.datanucleus.api.jdo.
JDOPersistenceManagerFactory be part of assembly? Because it is not in jar 
now.

  thanks in advance
  Jakub



-- Původní zpráva --
Od: DB Tsai 
Komu: spark.dubovsky.ja...@seznam.cz
Datum: 5. 12. 2014 22:53:32
Předmět: Re: Including data nucleus tools

"

Can you try to run the same job using the assembly packaged by make-
distribution as we discussed in the other thread.





Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com(https://www.dbtsai.com)
LinkedIn: https://www.linkedin.com/in/dbtsai
(https://www.linkedin.com/in/dbtsai)




On Fri, Dec 5, 2014 at 12:25 PM, mailto:spark.dubovsky.ja...@seznam.cz)> wrote:
"
Hi all,

  I have created assembly jar from 1.2 snapshot source by running [1] which 
sets correct version of hadoop for our cluster and uses hive profile. I also
have written relatively simple test program which starts by reading data 
from parquet using hive context. I compile the code against assembly jar 
created and then submited it on a cluster using by [2]. Job fails in its 
early stage on creating HiveContext itself. Important part of stack trace is
[3].

  Could please some of you explain what is wrong and how it should be fixed?
I have found only SPARK-4532
(https://issues.apache.org/jira/browse/SPARK-4532) when looking for 
something related. Fix for the bug is merged in source I have used so this 
is ruled out...

  Thanks for help

  Jakub

[1] ./sbt/sbt -Dhadoop.version=2.3.0-cdh5.1.3 -Pyarn -Phive assembly/
assembly

[2] ./bin/spark-submit --num-executors 200 --master yarn-cluster --conf 
spark.yarn.jar=assembly/target/scala-2.10/spark-assembly-1.2.1-SNAPSHOT-
hadoop2.3.0-cdh5.1.3.jar --class org.apache.spark.mllib.
CreateGuidDomainDictionary root-0.1.jar ...some-args-here

[3]
14/12/05 20:28:15 INFO yarn.ApplicationMaster: Final app status: FAILED, 
exitCode: 15, (reason: User class threw exception: java.lang.
RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.
HiveMetaStoreClient)
Exception in thread "Driver" java.lang.RuntimeException: java.lang.
RuntimeException: Unable to instantiate
...
Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.
JDOPersistenceManagerFactory
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
...

"



"

PySpark Loading Json Following by groupByKey seems broken in spark 1.1.1

2014-12-06 Thread Brad Willard
When I run a groupByKey it seems to create a single tasks after the
groupByKey that never stops executing. I'm loading a smallish json dataset
that is 4 million. This is the code I'm running.

rdd = sql_context.jsonFile(uri)
rdd = rdd.cache()

grouped = rdd.map(lambda row: (row.id, row)).groupByKey(160)

grouped.take(1)

The groupByKey stage takes a few minutes with 160 tasks which is expected.
However it then creates a single task "runjob at PythonRDD.scala:300" that
never ends. I gave up after 30minutes.


 







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Loading-Json-Following-by-groupByKey-seems-broken-in-spark-1-1-1-tp20559.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: cartesian on pyspark not paralleised

2014-12-06 Thread Akhil Das
You could try increasing the level of parallelism
(spark.default.parallelism) while creating the sparkContext

Thanks
Best Regards

On Fri, Dec 5, 2014 at 6:37 PM, Antony Mayi 
wrote:

> Hi,
>
> using pyspark 1.1.0 on YARN 2.5.0. all operations run nicely in parallel -
> I can seen multiple python processes spawned on each nodemanager but from
> some reason when running cartesian there is only single python process
> running on each node. the task is indicating thousands of partitions so
> don't understand why it is not running with higher parallelism. the
> performance is obviously poor although other operation rocks.
>
> any idea how to improve this?
>
> thank you,
> Antony.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Where can you get nightly builds?

2014-12-06 Thread Simone Franzini
I recently read in the mailing list that there are now nightly builds
available. However, I can't find them anywhere. Is this really done? If so,
where can I get them?

Thanks,
Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Re: Where can you get nightly builds?

2014-12-06 Thread Ted Yu
See https://amplab.cs.berkeley.edu/jenkins/view/Spark/

See also https://issues.apache.org/jira/browse/SPARK-1517

Cheers

On Sat, Dec 6, 2014 at 6:41 AM, Simone Franzini 
wrote:

> I recently read in the mailing list that there are now nightly builds
> available. However, I can't find them anywhere. Is this really done? If so,
> where can I get them?
>
> Thanks,
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>


Is there a way to force spark to use specific ips?

2014-12-06 Thread Ashic Mahtab
Hi,It appears that spark is always attempting to use the driver's hostname to 
connect / broadcast. This is usually fine, except when the cluster doesn't have 
DNS configured. For example, in a vagrant cluster with a private network. The 
workers and masters, and the host (where the driver runs from) can all see each 
other by ip. I can also specify --conf "spark.driver.host=192.168.40.1", and 
that results in the workers being able to connect to the driver. However, when 
trying to broadcast anything, it's still trying to use the hostname of the 
host. Now, I can set up a host entry in etc/hosts, but was wondering if there's 
a way to not require the hassle. Is there any way I can force spark to always 
use ips and not hostnames?
Thanks,
Ashic.

  

Re: Running two different Spark jobs vs multi-threading RDDs

2014-12-06 Thread Corey Nolet
Reading the documentation a little more closely, I'm using the wrong
terminology. I'm using stages to refer to what spark is calling a job. I
guess application (more than one spark context) is what I'm asking about
On Dec 5, 2014 5:19 PM, "Corey Nolet"  wrote:

> I've read in the documentation that RDDs can be run concurrently when
> submitted in separate threads. I'm curious how the scheduler would handle
> propagating these down to the tasks.
>
> I have 3 RDDs:
> - one RDD which loads some initial data, transforms it and caches it
> - two RDDs which use the cached RDD to provide reports
>
> I'm trying to figure out how the resources will be scheduled to perform
> these stages if I were to concurrently run the two RDDs that depend on the
> first RDD. Would the two RDDs run sequentially? Will they both run @ the
> same time and be smart about how they are caching?
>
> Would this be a time when I'd want to use Tachyon instead and run this as
> 2 separate physical jobs: one to place the shared data in the RAMDISK and
> one to run the two dependent RDDs concurrently? Or would it even be best in
> that case to run 3 completely separate jobs?
>
> We're planning on using YARN so there's 2 levels of scheduling going on.
> We're trying to figure out the best way to utilize the resources so that we
> are fully saturating the system and making sure there's constantly work
> being done rather than anything spinning gears waiting on upstream
> processing to occur (in mapreduce, we'd just submit a ton of jobs and have
> them wait in line).
>


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-06 Thread Jianshi Huang
Hmm... another issue I found doing this approach is that ANALYZE TABLE ...
COMPUTE STATISTICS will fail to attach the metadata to the table, and later
broadcast join and such will fail...

Any idea how to fix this issue?

Jianshi

On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang 
wrote:

> Very interesting, the line doing drop table will throws an exception.
> After removing it all works.
>
> Jianshi
>
> On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang 
> wrote:
>
>> Here's the solution I got after talking with Liancheng:
>>
>> 1) using backquote `..` to wrap up all illegal characters
>>
>> val rdd = parquetFile(file)
>> val schema = rdd.schema.fields.map(f => s"`${f.name}`
>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>
>> val ddl_13 = s"""
>>   |CREATE EXTERNAL TABLE $name (
>>   |  $schema
>>   |)
>>   |STORED AS PARQUET
>>   |LOCATION '$file'
>>   """.stripMargin
>>
>> sql(ddl_13)
>>
>> 2) create a new Schema and do applySchema to generate a new SchemaRDD,
>> had to drop and register table
>>
>> val t = table(name)
>> val newSchema = StructType(t.schema.fields.map(s => s.copy(name =
>> s.name.replaceAll(".*?::", ""
>> sql(s"drop table $name")
>> applySchema(t, newSchema).registerTempTable(name)
>>
>> I'm testing it for now.
>>
>> Thanks for the help!
>>
>>
>> Jianshi
>>
>> On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang 
>> wrote:
>>
>>> Hi,
>>>
>>> I had to use Pig for some preprocessing and to generate Parquet files
>>> for Spark to consume.
>>>
>>> However, due to Pig's limitation, the generated schema contains Pig's
>>> identifier
>>>
>>> e.g.
>>> sorted::id, sorted::cre_ts, ...
>>>
>>> I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.
>>>
>>>   create external table pmt (
>>> sorted::id bigint
>>>   )
>>>   stored as parquet
>>>   location '...'
>>>
>>> Obviously it didn't work, I also tried removing the identifier sorted::,
>>> but the resulting rows contain only nulls.
>>>
>>> Any idea how to create a table in HiveContext from these Parquet files?
>>>
>>> Thanks,
>>> Jianshi
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: SQL query in scala API

2014-12-06 Thread Arun Luthra
Thanks, I will try this.

On Fri, Dec 5, 2014 at 1:19 AM, Cheng Lian  wrote:

>  Oh, sorry. So neither SQL nor Spark SQL is preferred. Then you may write
> you own aggregation with aggregateByKey:
>
> users.aggregateByKey((0, Set.empty[String]))({ case ((count, seen), user) =>
>   (count + 1, seen + user)
> }, { case ((count0, seen0), (count1, seen1)) =>
>   (count0 + count1, seen0 ++ seen1)
> }).mapValues { case (count, seen) =>
>   (count, seen.size)
> }
>
> On 12/5/14 3:47 AM, Arun Luthra wrote:
>
>   Is that Spark SQL? I'm wondering if it's possible without spark SQL.
>
> On Wed, Dec 3, 2014 at 8:08 PM, Cheng Lian  wrote:
>
>>  You may do this:
>>
>> table("users").groupBy('zip)('zip, count('user), countDistinct('user))
>>
>>  On 12/4/14 8:47 AM, Arun Luthra wrote:
>>
>>  I'm wondering how to do this kind of SQL query with PairRDDFunctions.
>>
>>  SELECT zip, COUNT(user), COUNT(DISTINCT user)
>> FROM users
>> GROUP BY zip
>>
>>  In the Spark scala API, I can make an RDD (called "users") of key-value
>> pairs where the keys are zip (as in ZIP code) and the values are user id's.
>> Then I can compute the count and distinct count like this:
>>
>>  val count = users.mapValues(_ => 1).reduceByKey(_ + _)
>> val countDistinct = users.distinct().mapValues(_ => 1).reduceByKey(_ + _)
>>
>>  Then, if I want count and countDistinct in the same table, I have to
>> join them on the key.
>>
>>  Is there a way to do this without doing a join (and without using SQL
>> or spark SQL)?
>>
>>  Arun
>>
>>  ​
>>
>
>​
>


Re: Running two different Spark jobs vs multi-threading RDDs

2014-12-06 Thread Aaron Davidson
You can actually submit multiple jobs to a single SparkContext in different
threads. In the case you mentioned with 2 stages having a common parent,
both will wait for the parent stage to complete and then the two will
execute in parallel, sharing the cluster resources.

Solutions that submit multiple applications are also reasonable, but then
you have to manage the job dependencies yourself.

On Sat, Dec 6, 2014 at 8:41 AM, Corey Nolet  wrote:

> Reading the documentation a little more closely, I'm using the wrong
> terminology. I'm using stages to refer to what spark is calling a job. I
> guess application (more than one spark context) is what I'm asking about
> On Dec 5, 2014 5:19 PM, "Corey Nolet"  wrote:
>
>> I've read in the documentation that RDDs can be run concurrently when
>> submitted in separate threads. I'm curious how the scheduler would handle
>> propagating these down to the tasks.
>>
>> I have 3 RDDs:
>> - one RDD which loads some initial data, transforms it and caches it
>> - two RDDs which use the cached RDD to provide reports
>>
>> I'm trying to figure out how the resources will be scheduled to perform
>> these stages if I were to concurrently run the two RDDs that depend on the
>> first RDD. Would the two RDDs run sequentially? Will they both run @ the
>> same time and be smart about how they are caching?
>>
>> Would this be a time when I'd want to use Tachyon instead and run this as
>> 2 separate physical jobs: one to place the shared data in the RAMDISK and
>> one to run the two dependent RDDs concurrently? Or would it even be best in
>> that case to run 3 completely separate jobs?
>>
>> We're planning on using YARN so there's 2 levels of scheduling going on.
>> We're trying to figure out the best way to utilize the resources so that we
>> are fully saturating the system and making sure there's constantly work
>> being done rather than anything spinning gears waiting on upstream
>> processing to occur (in mapreduce, we'd just submit a ton of jobs and have
>> them wait in line).
>>
>


RE: Adding Spark Cassandra dependency breaks Spark Streaming?

2014-12-06 Thread Ashic Mahtab
Update:
It seems the following combo causes things in spark streaming to go missing:
spark-core 1.1.0spark-streaming 1.1.0spark-cassandra-connector 1.1.0
The moment I add the three together, things like StreamingContext and Seconds 
are unavailable. sbt assembly fails saying those aren't there. Sbt clean / 
deleting .ivy2 and .m2 doesn't resolve the issue.
I've also set up an 1.1.1 spark cluster, and created a jar with the following 
dependencies:
spark-core 1.1.1spark-streaming 1.1.1spark-sql 1.1.1spark-cassandra-connector 
1.1.0
Everything runs perfectly.
I'll be upgrading my clusters to 1.1.1 anyway, but I am intrigued...I'm fairly 
new to sbt, scala and the jvm in general. Any idea how having spark streaming 
1.1.0 and spark cassandra connector 1.1.0 together would cause classes in spark 
streaming to go missing?
Here's the full sbt file if anybody is interested:
import sbt._
import Keys._



name := "untitled19"

version := "1.0"

scalaVersion := "2.10.4"

val sparkCore = "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"
val sparkStreaming = "org.apache.spark" %% "spark-streaming" % "1.1.0" % 
"provided"
val sparkSql = "org.apache.spark" %% "spark-sql" % "1.1.0" % "provided"
val sparkCassandra = "com.datastax.spark" %% "spark-cassandra-connector" % 
"1.1.0" withSources() withJavadoc()

libraryDependencies ++= Seq(
  sparkCore,
  sparkSql,
  sparkStreaming,
  sparkCassandra
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/";

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs@_*) =>
(xs map (_.toLowerCase)) match {
  case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: 
Nil) => MergeStrategy.discard
  case _ => MergeStrategy.discard
}
  case _ => MergeStrategy.first
} 
Regards,Ashic.

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Sat, 6 Dec 2014 03:54:19 +




Getting this on the home machine as well. Not referencing the spark cassandra 
connector in libraryDependencies compiles. 
I've recently updated IntelliJ to 14. Could that be causing an issue? 

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Fri, 5 Dec 2014 19:24:46 +




Sorry...really don't have enough maven know how to do this quickly. I tried the 
pom below, and IntelliJ could find org.apache.spark.streaming.StreamingContext 
and org.apache.spark.streaming.Seconds, but not 
org.apache.spark.streaming.receiver.Receiver. Is there something specific I can 
try? I'll try sbt on the home machine in about a couple of hours.


http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0

untitled100
untiled100
1.0-SNAPSHOT



org.apache.spark
spark-core_2.10
1.1.0


org.apache.spark
spark-streaming_2.10
1.1.0


com.datastax.spark
spark-cassandra-connector_2.10
1.1.0








Date: Fri, 5 Dec 2014 10:58:51 -0800
Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming?
From: yuzhih...@gmail.com
To: as...@live.com
CC: user@spark.apache.org

Can you try with maven ?
diff --git a/streaming/pom.xml b/streaming/pom.xmlindex b8b8f2e..6cc8102 
100644--- a/streaming/pom.xml+++ b/streaming/pom.xml@@ -68,6 +68,11 @@   
junit-interface   test 
++  com.datastax.spark+ 
 spark-cassandra-connector_2.10+  
1.1.0+   
target/scala-${scala.binary.version}/classes
You can use the following command:mvn -pl core,streaming package -DskipTests

Cheers
On Fri, Dec 5, 2014 at 9:35 AM, Ashic Mahtab  wrote:



Hi,
Seems adding the cassandra connector and spark streaming causes "issues". I've 
added by build and code file. Running "sbt compile" gives weird errors like 
Seconds is not part of org.apache.spark.streaming and object Receiver is not a 
member of package org.apache.spark.streaming.receiver. If I take out 
cassandraConnector from the list of dependencies, "sbt compile" succeeds.
How is adding the dependency removing things from spark streaming packages? Is 
there something I can do (perhaps in sbt) to not have this break?

Here's my build file:
import sbt.Keys._
import sbt._
name := "untitled99"
version := "1.0"
scalaVersion := "2.10.4"
val spark = "org.apache.spark" %% "spark-core" % "1.1.0"
val sparkStreaming = "org.apache.spark" %% "spark-streaming" % "1.1.0"
val cassandraConnector = "com.datastax.spark" %% "spark-cassandra-connector" % 
"1.1.0" withSources() withJavadoc()
libraryDependencies ++= Seq(
cassandraConnector,
spark,
sparkStreaming
)
resolvers += "Akka Repository" at "http://repo.akka.io

Re: Where can you get nightly builds?

2014-12-06 Thread Nicholas Chammas
To expand on Ted's response, there are currently no nightly builds
published for users to use. You can watch SPARK-1517 (which Ted linked to)
to be updated when that happens.

On Sat Dec 06 2014 at 10:19:10 AM Ted Yu  wrote:

> See https://amplab.cs.berkeley.edu/jenkins/view/Spark/
>
> See also https://issues.apache.org/jira/browse/SPARK-1517
>
> Cheers
>
> On Sat, Dec 6, 2014 at 6:41 AM, Simone Franzini 
> wrote:
>
>> I recently read in the mailing list that there are now nightly builds
>> available. However, I can't find them anywhere. Is this really done? If so,
>> where can I get them?
>>
>> Thanks,
>> Simone Franzini, PhD
>>
>> http://www.linkedin.com/in/simonefranzini
>>
>
>


RE: Adding Spark Cassandra dependency breaks Spark Streaming?

2014-12-06 Thread Ashic Mahtab
Hi,Just checked cassandra connector 1.1.0-beta1 runs fine. The issue seems 
to be 1.1.0 for spark streaming and 1.1.0 cassandra connector (final).
Regards,Ashic.

Date: Sat, 6 Dec 2014 13:52:20 -0500
Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming?
From: jayunit100.apa...@gmail.com
To: as...@live.com

This is working for me as a dependency set for spark streaming app w/ cassandra.

https://github.com/jayunit100/SparkBlueprint/blob/master/build.sbt



  
  




  
  

libraryDependencies += "com.datastax.spark" %% 
"spark-cassandra-connector" % "1.1.0-beta1" withSources() withJavadoc()
  
  




  
  

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
  
  




  
  

libraryDependencies +=  "org.scalatest" % "scalatest_2.10.0-M4" % 
"1.9-2.10.0-M4-B1"
  
  




  
  

libraryDependencies +=  "junit" % "junit" % "4.8.1" % "test"
  
  




  
  

libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.1.0"
  
  




  
  

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.1.0"
  
  




  
  

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.1.0"
  
  




On Sat, Dec 6, 2014 at 12:29 PM, Ashic Mahtab  wrote:



Update:
It seems the following combo causes things in spark streaming to go missing:
spark-core 1.1.0spark-streaming 1.1.0spark-cassandra-connector 1.1.0
The moment I add the three together, things like StreamingContext and Seconds 
are unavailable. sbt assembly fails saying those aren't there. Sbt clean / 
deleting .ivy2 and .m2 doesn't resolve the issue.
I've also set up an 1.1.1 spark cluster, and created a jar with the following 
dependencies:
spark-core 1.1.1spark-streaming 1.1.1spark-sql 1.1.1spark-cassandra-connector 
1.1.0
Everything runs perfectly.
I'll be upgrading my clusters to 1.1.1 anyway, but I am intrigued...I'm fairly 
new to sbt, scala and the jvm in general. Any idea how having spark streaming 
1.1.0 and spark cassandra connector 1.1.0 together would cause classes in spark 
streaming to go missing?
Here's the full sbt file if anybody is interested:
import sbt._
import Keys._



name := "untitled19"

version := "1.0"

scalaVersion := "2.10.4"

val sparkCore = "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"
val sparkStreaming = "org.apache.spark" %% "spark-streaming" % "1.1.0" % 
"provided"
val sparkSql = "org.apache.spark" %% "spark-sql" % "1.1.0" % "provided"
val sparkCassandra = "com.datastax.spark" %% "spark-cassandra-connector" % 
"1.1.0" withSources() withJavadoc()

libraryDependencies ++= Seq(
  sparkCore,
  sparkSql,
  sparkStreaming,
  sparkCassandra
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/";

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs@_*) =>
(xs map (_.toLowerCase)) match {
  case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: 
Nil) => MergeStrategy.discard
  case _ => MergeStrategy.discard
}
  case _ => MergeStrategy.first
} 
Regards,Ashic.

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Sat, 6 Dec 2014 03:54:19 +




Getting this on the home machine as well. Not referencing the spark cassandra 
connector in libraryDependencies compiles. 
I've recently updated IntelliJ to 14. Could that be causing an issue? 

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Fri, 5 Dec 2014 19:24:46 +




Sorry...really don't have enough maven know how to do this quickly. I tried the 
pom below, and IntelliJ could find org.apache.spark.streaming.StreamingContext 
and org.apache.spark.streaming.Seconds, but not 
org.apache.spark.streaming.receiver.Receiver. Is there something specific I can 
try? I'll try sbt on the home machine in about a couple of hours.


http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0

untitled100
untiled100
1.0-SNAPSHOT



org.apache.spark
spark-core_2.10
1.1.0


org.apache.spark
spark-streaming_2.10
1.1.0


com.datastax.spark
spark-cassandra-connector_2.10
1.1.0








Date: Fri, 5 Dec 2014 10:58:51 -0800
Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming?
From: yuzhih.

Re: Including data nucleus tools

2014-12-06 Thread Michael Armbrust
On Sat, Dec 6, 2014 at 5:53 AM,  wrote:
>
> Bonus question: Should the class
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory be part of assembly?
> Because it is not in jar now.
>

No these jars cannot be put into the assembly because they have extra
metadata files that live in the same location (so if you put them all in an
assembly they overrwrite each other).  This metadata is used in discovery.
Instead they must be manually put on the classpath in their original form
(usually using --jars).


Re: Modifying an RDD in forEach

2014-12-06 Thread Mohit Jaggi
Ron,
“appears to be working” might be true when there are no failures. on large 
datasets being processed on a large number of machines, failures of several 
types(server, network, disk etc) can happen. At that time, Spark will not 
“know” that you changed the RDD in-place and will use any version of any 
partition of the RDD to be retried. Retries require idempotency and that is 
difficult without immutability. I believe, this is one of the primary reasons 
for making RDDs immutable in Spark (mutable isn't even an option worth 
considering). In general mutating something in a distributed system is a hard 
problem. It can be solved (e.g. in NoSQL or newSQL databases) but Spark is not 
a transactional data store.

If you are building an iterative machine learning algorithm which usually have 
a “reduce” step at the end of every iteration, then the lazy evaluation is 
unlikely to be useful. On the other hand, if these intermediate RDDs stay in 
the young generation of the JVM heap [I am not sure if RDD cache management 
somehow changes this, so I could be wrong] they are garbage collected quickly 
and with very little overhead.

This is the price of scaling out :-)

Hope this helps,
Mohit.

> On Dec 6, 2014, at 5:02 AM, Mayur Rustagi  wrote:
> 
> You'll benefit by viewing Matei's talk in Yahoo on Spark internals and how it 
> optimizes execution of iterative jobs.
> Simple answer is 
> 1. Spark doesn't materialize RDD when you do an iteration but lazily captures 
> the transformation functions in RDD.(only function and closure , no data 
> operation actually happens)
> 2. When you finally execute and want to cause effects (save to disk , collect 
> on master etc) it views the DAG of execution and optimizes what it can reason 
> (eliminating intermediate states , performing multiple Transformations in one 
> tasks, leveraging partitioning where available among others)
> Bottom line it doesn't matter how many RDD you have in your DAG chain as long 
> as Spark can optimize the functions in that DAG to create minimal 
> materialization on its way to final output. 
> 
> Regards
> Mayur
> On 06-Dec-2014 6:12 pm, "Ron Ayoub"  > wrote:
> This is from a separate thread with a differently named title. 
> 
> Why can't you modify the actual contents of an RDD using forEach? It appears 
> to be working for me. What I'm doing is changing cluster assignments and 
> distances per data item for each iteration of the clustering algorithm. The 
> clustering algorithm is massive and iterates thousands of times. As I 
> understand it now, you are supposed to create new RDDs on each pass. This is 
> a hierachical k-means that I'm doing and hence it is consist of many 
> iterations rather than large iterations.
> 
> So I understand the restriction of why operation when aggregating and 
> reducing etc, need to be associative. However, forEach operates on a single 
> item. So being that Spark is advertised as being great for iterative 
> algorithms since it operates in-memory, how can it be good to create 
> thousands upon thousands of RDDs during the course of an iterative algorithm? 
>  Does Spark have some kind of trick like reuse behind the scenes - fully 
> persistent data objects or whatever? How can it possibly be efficient for 
> 'iterative' algorithms when it is creating so many RDDs as opposed to one? 
> 
> Or is the answer that I should keep doing what I'm doing because it is 
> working even though it is not theoretically sound and aligned with functional 
> ideas. I personally just want it to be fast and be able to operate on up to 
> 500 million data items. 



Spark on YARN memory utilization

2014-12-06 Thread Denny Lee
This is perhaps more of a YARN question than a Spark question but i was
just curious to how is memory allocated in YARN via the various
configurations.  For example, if I spin up my cluster with 4GB with a
different number of executors as noted below

 4GB executor-memory x 10 executors = 46GB  (4GB x 10 = 40 + 6)
 4GB executor-memory x 4 executors = 19GB (4GB x 4 = 16 + 3)
 4GB executor-memory x 2 executors = 10GB (4GB x 2 = 8 + 2)

The pattern when observing the RM is that there is a container for each
executor and one additional container.  From the basis of memory, it looks
like there is an additional (1GB + (0.5GB x # executors)) that is allocated
in YARN.

Just wondering why is this  - or is this just an artifact of YARN itself?

Thanks!


RE: Modifying an RDD in forEach

2014-12-06 Thread Ron Ayoub
These are very interesting comments. The vast majority of cases I'm working on 
are going to be in the 3 million range and 100 million was thrown out as 
something to shoot for. I upped it to 500 million. But all things considering, 
I believe I may be able to directly translate what I have to Java Streams API 
and run 100 million docs on 32 cores in under an hour or two which would suit 
our needs. Up until this point I've been focused on computational aspect 
If I can scale up to clustering 100 million documents on a single machine I can 
probably directly translate what I have to Java Streams API and be faster. It 
is that scaling out that changes things. These are interesting comments. I 
think in this hierarchical k-means case the lazy evaluation becomes almost 
useless and perhaps even an impediment. Part of the problem is that I've been a 
bit too focused on math/information retrieval and have to update a bit on 
functional approach to programming so I can better utilize the tools But it 
does appear that Spark may not be the best option for this need. I don't need 
resiliency or fault tolerance as much as I need to be able to execute an 
algorithm on a large amount of data fast and then be done with it. I'm now 
thinking that in the 100 million document range I may be ok clustering feature 
vectors with no more than 25 features per doc on a single machine with 32 cores 
and a load of memory. I might directly translate what I have to Java 8 Streams 
API. 
There is also questions of proportion. Perhaps what I have is not big enough to 
warrant or require scaling out. I may have other uses for Spark in traditional 
map-reduce algorithms such as counting pairs of shared shingles for near dupe 
detection but to this point I've found Oracles parallel-pipelined table 
functions, while not glamorous are doing quite well in DB. 
I'm just a bit confused still on why it is advertised ideal for iterative 
algorithms when iterative algorithms have that point per iteration where things 
do get evaluated and laziness is not terribly useful. Ideal for massive 
in-memory cluster computing yes - but iterative... ? not sure. I have that book 
"Functional Programming in Scala" and I hope to read it someday and enrich my 
understanding here. 

Subject: Re: Modifying an RDD in forEach
From: mohitja...@gmail.com
Date: Sat, 6 Dec 2014 13:13:50 -0800
CC: ronalday...@live.com; user@spark.apache.org
To: mayur.rust...@gmail.com

Ron,“appears to be working” might be true when there are no failures. on large 
datasets being processed on a large number of machines, failures of several 
types(server, network, disk etc) can happen. At that time, Spark will not 
“know” that you changed the RDD in-place and will use any version of any 
partition of the RDD to be retried. Retries require idempotency and that is 
difficult without immutability. I believe, this is one of the primary reasons 
for making RDDs immutable in Spark (mutable isn't even an option worth 
considering). In general mutating something in a distributed system is a hard 
problem. It can be solved (e.g. in NoSQL or newSQL databases) but Spark is not 
a transactional data store.
If you are building an iterative machine learning algorithm which usually have 
a “reduce” step at the end of every iteration, then the lazy evaluation is 
unlikely to be useful. On the other hand, if these intermediate RDDs stay in 
the young generation of the JVM heap [I am not sure if RDD cache management 
somehow changes this, so I could be wrong] they are garbage collected quickly 
and with very little overhead.
This is the price of scaling out :-)Hope this helps,Mohit.
On Dec 6, 2014, at 5:02 AM, Mayur Rustagi  
wrote:You'll benefit by viewing Matei's talk in Yahoo on Spark internals and 
how it optimizes execution of iterative jobs.

Simple answer is 

1. Spark doesn't materialize RDD when you do an iteration but lazily captures 
the transformation functions in RDD.(only function and closure , no data 
operation actually happens)

2. When you finally execute and want to cause effects (save to disk , collect 
on master etc) it views the DAG of execution and optimizes what it can reason 
(eliminating intermediate states , performing multiple Transformations in one 
tasks, leveraging partitioning where available among others)

Bottom line it doesn't matter how many RDD you have in your DAG chain as long 
as Spark can optimize the functions in that DAG to create minimal 
materialization on its way to final output. 
Regards

Mayur


On 06-Dec-2014 6:12 pm, "Ron Ayoub"  wrote:



This is from a separate thread with a differently named title. 
Why can't you modify the actual contents of an RDD using forEach? It appears to 
be working for me. What I'm doing is changing cluster assignments and distances 
per data item for each iteration of the clustering algorithm. The clustering 
algorithm is massive and iterates thousands of times. As I understand it now, 
you are supposed to create new RDD

Re: Modifying an RDD in forEach

2014-12-06 Thread Mohit Jaggi
“ideal for iterative workloads” is a comparison to hadoop map-reduce. if you 
are happy with a single machine, by all means, do that.

scaling out may be useful when:
1) you want to finish the task faster by using more machines. this may not 
involve any additional cost if you are using utility computing like AWS. e.g. 
if the cost of 1 machine for 1 hour is the same as the cost of 60 machines for 
a minute, but you get your results 60 times faster
2) if you may have larger data. at some point you will run out of “vertical 
scaling” options or they will become prohibitively expensive [e.g. you had 
everything working for 100 million docs but then you got 10 more docs. now do 
you buy and install more DIMMs in your server?]
3) if you are using utility computing like AWS and there is a cliff drop in 
pricing for smaller machines [this is common]
4) what if you want to modify your algorithm and now it needs a few more bytes 
of memory? go to the store, buy DIMMs, and install in your server?

> On Dec 6, 2014, at 1:42 PM, Ron Ayoub  wrote:
> 
> These are very interesting comments. The vast majority of cases I'm working 
> on are going to be in the 3 million range and 100 million was thrown out as 
> something to shoot for. I upped it to 500 million. But all things 
> considering, I believe I may be able to directly translate what I have to 
> Java Streams API and run 100 million docs on 32 cores in under an hour or two 
> which would suit our needs. Up until this point I've been focused on 
> computational aspect 
> 
> If I can scale up to clustering 100 million documents on a single machine I 
> can probably directly translate what I have to Java Streams API and be 
> faster. It is that scaling out that changes things. These are interesting 
> comments. I think in this hierarchical k-means case the lazy evaluation 
> becomes almost useless and perhaps even an impediment. Part of the problem is 
> that I've been a bit too focused on math/information retrieval and have to 
> update a bit on functional approach to programming so I can better utilize 
> the tools But it does appear that Spark may not be the best option for this 
> need. I don't need resiliency or fault tolerance as much as I need to be able 
> to execute an algorithm on a large amount of data fast and then be done with 
> it. I'm now thinking that in the 100 million document range I may be ok 
> clustering feature vectors with no more than 25 features per doc on a single 
> machine with 32 cores and a load of memory. I might directly translate what I 
> have to Java 8 Streams API. 
> 
> There is also questions of proportion. Perhaps what I have is not big enough 
> to warrant or require scaling out. I may have other uses for Spark in 
> traditional map-reduce algorithms such as counting pairs of shared shingles 
> for near dupe detection but to this point I've found Oracles 
> parallel-pipelined table functions, while not glamorous are doing quite well 
> in DB. 
> 
> I'm just a bit confused still on why it is advertised ideal for iterative 
> algorithms when iterative algorithms have that point per iteration where 
> things do get evaluated and laziness is not terribly useful. Ideal for 
> massive in-memory cluster computing yes - but iterative... ? not sure. I have 
> that book "Functional Programming in Scala" and I hope to read it someday and 
> enrich my understanding here. 
> 
> Subject: Re: Modifying an RDD in forEach
> From: mohitja...@gmail.com
> Date: Sat, 6 Dec 2014 13:13:50 -0800
> CC: ronalday...@live.com; user@spark.apache.org
> To: mayur.rust...@gmail.com
> 
> Ron,
> “appears to be working” might be true when there are no failures. on large 
> datasets being processed on a large number of machines, failures of several 
> types(server, network, disk etc) can happen. At that time, Spark will not 
> “know” that you changed the RDD in-place and will use any version of any 
> partition of the RDD to be retried. Retries require idempotency and that is 
> difficult without immutability. I believe, this is one of the primary reasons 
> for making RDDs immutable in Spark (mutable isn't even an option worth 
> considering). In general mutating something in a distributed system is a hard 
> problem. It can be solved (e.g. in NoSQL or newSQL databases) but Spark is 
> not a transactional data store.
> 
> If you are building an iterative machine learning algorithm which usually 
> have a “reduce” step at the end of every iteration, then the lazy evaluation 
> is unlikely to be useful. On the other hand, if these intermediate RDDs stay 
> in the young generation of the JVM heap [I am not sure if RDD cache 
> management somehow changes this, so I could be wrong] they are garbage 
> collected quickly and with very little overhead.
> 
> This is the price of scaling out :-)
>   
> Hope this helps,
> Mohit.
> 
> On Dec 6, 2014, at 5:02 AM, Mayur Rustagi  > wrote:
> 
> You'll benefit by viewing Matei's talk in Yahoo

Re: Spark on YARN memory utilization

2014-12-06 Thread Arun Ahuja
Hi Denny,

This is due the spark.yarn.memoryOverhead parameter, depending on what
version of Spark you are on the default of this may differ, but it should
be the larger of 1024mb per executor or .07 * executorMemory.

When you set executor memory, the yarn resource request is executorMemory +
yarnOverhead.

- Arun

On Sat, Dec 6, 2014 at 4:27 PM, Denny Lee  wrote:

> This is perhaps more of a YARN question than a Spark question but i was
> just curious to how is memory allocated in YARN via the various
> configurations.  For example, if I spin up my cluster with 4GB with a
> different number of executors as noted below
>
>  4GB executor-memory x 10 executors = 46GB  (4GB x 10 = 40 + 6)
>  4GB executor-memory x 4 executors = 19GB (4GB x 4 = 16 + 3)
>  4GB executor-memory x 2 executors = 10GB (4GB x 2 = 8 + 2)
>
> The pattern when observing the RM is that there is a container for each
> executor and one additional container.  From the basis of memory, it looks
> like there is an additional (1GB + (0.5GB x # executors)) that is allocated
> in YARN.
>
> Just wondering why is this  - or is this just an artifact of YARN itself?
>
> Thanks!
>
>


Re: Is there a way to force spark to use specific ips?

2014-12-06 Thread Matt Narrell
Its much easier if you access your nodes by name.  If you’re using Vagrant, use 
the hosts provisioner:  https://github.com/adrienthebo/vagrant-hosts 


mn

> On Dec 6, 2014, at 8:37 AM, Ashic Mahtab  wrote:
> 
> Hi,
> It appears that spark is always attempting to use the driver's hostname to 
> connect / broadcast. This is usually fine, except when the cluster doesn't 
> have DNS configured. For example, in a vagrant cluster with a private 
> network. The workers and masters, and the host (where the driver runs from) 
> can all see each other by ip. I can also specify --conf 
> "spark.driver.host=192.168.40.1", and that results in the workers being able 
> to connect to the driver. However, when trying to broadcast anything, it's 
> still trying to use the hostname of the host. Now, I can set up a host entry 
> in etc/hosts, but was wondering if there's a way to not require the hassle. 
> Is there any way I can force spark to always use ips and not hostnames?
> 
> Thanks,
> Ashic.



Re: Spark on YARN memory utilization

2014-12-06 Thread Denny Lee
Got it - thanks!
On Sat, Dec 6, 2014 at 14:56 Arun Ahuja  wrote:

> Hi Denny,
>
> This is due the spark.yarn.memoryOverhead parameter, depending on what
> version of Spark you are on the default of this may differ, but it should
> be the larger of 1024mb per executor or .07 * executorMemory.
>
> When you set executor memory, the yarn resource request is executorMemory
> + yarnOverhead.
>
> - Arun
>
> On Sat, Dec 6, 2014 at 4:27 PM, Denny Lee  wrote:
>
>> This is perhaps more of a YARN question than a Spark question but i was
>> just curious to how is memory allocated in YARN via the various
>> configurations.  For example, if I spin up my cluster with 4GB with a
>> different number of executors as noted below
>>
>>  4GB executor-memory x 10 executors = 46GB  (4GB x 10 = 40 + 6)
>>  4GB executor-memory x 4 executors = 19GB (4GB x 4 = 16 + 3)
>>  4GB executor-memory x 2 executors = 10GB (4GB x 2 = 8 + 2)
>>
>> The pattern when observing the RM is that there is a container for each
>> executor and one additional container.  From the basis of memory, it looks
>> like there is an additional (1GB + (0.5GB x # executors)) that is allocated
>> in YARN.
>>
>> Just wondering why is this  - or is this just an artifact of YARN itself?
>>
>> Thanks!
>>
>>
>


run JavaAPISuite with mavem

2014-12-06 Thread Koert Kuipers
when i run "mvn test -pl core", i dont see JavaAPISuite being run. or if it
is, its being very very quiet about it. is this by design?


Re: run JavaAPISuite with mavem

2014-12-06 Thread Ted Yu
In master branch, I only found JavaAPISuite in comment:

spark tyu$ find . -name '*.scala' -exec grep JavaAPISuite {} \; -print
   * For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
   * converted into a `Object` array. For usage example, see test case
JavaAPISuite.testJavaJdbcRDD.
./core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

FYI

On Sat, Dec 6, 2014 at 5:43 PM, Koert Kuipers  wrote:

> when i run "mvn test -pl core", i dont see JavaAPISuite being run. or if
> it is, its being very very quiet about it. is this by design?
>


java.lang.ExceptionInInitializerError/Unable to load YARN support

2014-12-06 Thread maven
All,

I just built Spark-1.2 on my enterprise server (which has Hadoop 2.3 with
YARN). Here're the steps I followed for the build:

$ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package
$ export SPARK_HOME=/path/to/spark/folder
$ export HADOOP_CONF_DIR=/etc/hadoop/conf

However, when I try to open the shell either locally or on YARN, I get the
following error:

Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:105)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:180)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)
at org.apache.spark.SparkContext.(SparkContext.scala:232)
at water.SparklingWaterDriver$.main(SparklingWaterDriver.scala:19)
at water.SparklingWaterDriver.main(SparklingWaterDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Unable to load YARN support
at
org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:199)
at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:194)
at 
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala)
... 15 more
Caused by: java.lang.IllegalArgumentException: Invalid rule: L
RULE:[2:$1@$0](.*@XXXCOMPANY.COM)s/(.*)@XXXCOMPANY.COM/$1/L
DEFAULT
at
org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
at
org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
at 
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43)
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.(YarnSparkHadoopUtil.scala:45)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at
org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:196)
... 17 more

I'm able to work with pre-installed Spark 1.0 without any issues. Any
thoughts on what may be causing this?

NR




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ExceptionInInitializerError-Unable-to-load-YARN-support-tp20560.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Including data nucleus tools

2014-12-06 Thread spark.dubovsky.jakub
Next try. I copied whole dist directory created by make-distribution script 
to cluster not just assembly jar. Then I used

./bin/spark-submit --num-executors 200 --master yarn-cluster --class org.
apache.spark.mllib.CreateGuidDomainDictionary ../spark/root-0.1.jar ${args}

 ...to run app again. Startup scripts printed this message:

"Spark assembly has been built with Hive, including Datanucleus jars on 
classpath"

  ...so I thought I am finally there. But job started and failed on the same
ClassNotFound exception as before. Is "classpath" from script message just 
classpath of driver? Or is it the same classpath which is affected by --jars
option? I was trying to find out from scripts but I was not able to find 
where --jars option is processed.

  thanks


-- Původní zpráva --
Od: Michael Armbrust 
Komu: spark.dubovsky.ja...@seznam.cz
Datum: 6. 12. 2014 20:39:13
Předmět: Re: Including data nucleus tools

"



On Sat, Dec 6, 2014 at 5:53 AM, mailto:/skin/default/img/empty.gif)> wrote:"
Bonus question: Should the class org.datanucleus.api.jdo.
JDOPersistenceManagerFactory be part of assembly? Because it is not in jar 
now.

"



No these jars cannot be put into the assembly because they have extra 
metadata files that live in the same location (so if you put them all in an 
assembly they overrwrite each other).  This metadata is used in discovery.  
Instead they must be manually put on the classpath in their original form 
(usually using --jars). 



 
"

Re: java.lang.ExceptionInInitializerError/Unable to load YARN support

2014-12-06 Thread maven
I noticed that when I unset HADOOP_CONF_DIR, I'm able to work in the local
mode without any errors. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ExceptionInInitializerError-Unable-to-load-YARN-support-tp20560p20561.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: run JavaAPISuite with mavem

2014-12-06 Thread Ted Yu
Pardon me, the test is here:

sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java

You can run 'mvn test' under sql/core

Cheers

On Sat, Dec 6, 2014 at 5:55 PM, Ted Yu  wrote:

> In master branch, I only found JavaAPISuite in comment:
>
> spark tyu$ find . -name '*.scala' -exec grep JavaAPISuite {} \; -print
>* For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
>* converted into a `Object` array. For usage example, see test case
> JavaAPISuite.testJavaJdbcRDD.
> ./core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
>
> FYI
>
> On Sat, Dec 6, 2014 at 5:43 PM, Koert Kuipers  wrote:
>
>> when i run "mvn test -pl core", i dont see JavaAPISuite being run. or if
>> it is, its being very very quiet about it. is this by design?
>>
>
>


Recovered executor num in yarn-client mode

2014-12-06 Thread yuemeng1

Hi, all
  I have (maybe a clumsy) question about executor recovery num in 
yarn-client mode. My situation is as follows:


  We have a 1(resource manager) + 3(node manager) cluster, a app is 
running with one driver on the resource manager and 12 executors on all 
the node managers,
and there are 4 executors on each node manager machine. For some reason 
4 executors on one machine disassociated/failed, then 2 executors recovered.
  My question is why not 4 executors recovered? who and how decide the 
number of recovered executors?


 Thanks


"vcores used" in cluster metrics(yarn resource manager ui) when running spark on yarn

2014-12-06 Thread yuemeng1

Hi, all
When i running an app with this cmd:  ./bin/spark-sql --master 
yarn-client --num-executors 2 --executor-cores 3, i noticed that yarn 
resource manager ui  shows the `vcores used` in cluster metrics is 3. It 
seems `vcores used` show wrong num (should be 7?)? Or i miss something?


Thanks



Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-06 Thread Jianshi Huang
Ok, found another possible bug in Hive.

My current solution is to use ALTER TABLE CHANGE to rename the column names.

The problem is after renaming the column names, the value of the columns
became all NULL.

Before renaming:
scala> sql("select `sorted::cre_ts` from pmt limit 1").collect
res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54])

Execute renaming:
scala> sql("alter table pmt change `sorted::cre_ts` cre_ts string")
res13: org.apache.spark.sql.SchemaRDD =
SchemaRDD[972] at RDD at SchemaRDD.scala:108
== Query Plan ==


After renaming:
scala> sql("select cre_ts from pmt limit 1").collect
res16: Array[org.apache.spark.sql.Row] = Array([null])

I created a JIRA for it:

  https://issues.apache.org/jira/browse/SPARK-4781


Jianshi

On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang 
wrote:

> Hmm... another issue I found doing this approach is that ANALYZE TABLE ...
> COMPUTE STATISTICS will fail to attach the metadata to the table, and later
> broadcast join and such will fail...
>
> Any idea how to fix this issue?
>
> Jianshi
>
> On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang 
> wrote:
>
>> Very interesting, the line doing drop table will throws an exception.
>> After removing it all works.
>>
>> Jianshi
>>
>> On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang 
>> wrote:
>>
>>> Here's the solution I got after talking with Liancheng:
>>>
>>> 1) using backquote `..` to wrap up all illegal characters
>>>
>>> val rdd = parquetFile(file)
>>> val schema = rdd.schema.fields.map(f => s"`${f.name}`
>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>
>>> val ddl_13 = s"""
>>>   |CREATE EXTERNAL TABLE $name (
>>>   |  $schema
>>>   |)
>>>   |STORED AS PARQUET
>>>   |LOCATION '$file'
>>>   """.stripMargin
>>>
>>> sql(ddl_13)
>>>
>>> 2) create a new Schema and do applySchema to generate a new SchemaRDD,
>>> had to drop and register table
>>>
>>> val t = table(name)
>>> val newSchema = StructType(t.schema.fields.map(s => s.copy(name =
>>> s.name.replaceAll(".*?::", ""
>>> sql(s"drop table $name")
>>> applySchema(t, newSchema).registerTempTable(name)
>>>
>>> I'm testing it for now.
>>>
>>> Thanks for the help!
>>>
>>>
>>> Jianshi
>>>
>>> On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang 
>>> wrote:
>>>
 Hi,

 I had to use Pig for some preprocessing and to generate Parquet files
 for Spark to consume.

 However, due to Pig's limitation, the generated schema contains Pig's
 identifier

 e.g.
 sorted::id, sorted::cre_ts, ...

 I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.

   create external table pmt (
 sorted::id bigint
   )
   stored as parquet
   location '...'

 Obviously it didn't work, I also tried removing the identifier
 sorted::, but the resulting rows contain only nulls.

 Any idea how to create a table in HiveContext from these Parquet files?

 Thanks,
 Jianshi
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github & Blog: http://huangjs.github.com/

>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: run JavaAPISuite with mavem

2014-12-06 Thread Koert Kuipers
Ted,
i mean
core/src/test/java/org/apache/spark/JavaAPISuite.java

On Sat, Dec 6, 2014 at 9:27 PM, Ted Yu  wrote:

> Pardon me, the test is here:
>
> sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
>
> You can run 'mvn test' under sql/core
>
> Cheers
>
> On Sat, Dec 6, 2014 at 5:55 PM, Ted Yu  wrote:
>
>> In master branch, I only found JavaAPISuite in comment:
>>
>> spark tyu$ find . -name '*.scala' -exec grep JavaAPISuite {} \; -print
>>* For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
>>* converted into a `Object` array. For usage example, see test case
>> JavaAPISuite.testJavaJdbcRDD.
>> ./core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
>>
>> FYI
>>
>> On Sat, Dec 6, 2014 at 5:43 PM, Koert Kuipers  wrote:
>>
>>> when i run "mvn test -pl core", i dont see JavaAPISuite being run. or if
>>> it is, its being very very quiet about it. is this by design?
>>>
>>
>>
>


Re: run JavaAPISuite with mavem

2014-12-06 Thread Ted Yu
I tried to run tests for core but there were failures. e.g. :

^[[32mExternalAppendOnlyMapSuite:^[[0m
^[[32m- simple insert^[[0m
^[[32m- insert with collision^[[0m
^[[32m- ordering^[[0m
^[[32m- null keys and values^[[0m
^[[32m- simple aggregator^[[0m
^[[32m- simple cogroup^[[0m
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
^[[31m- spilling *** FAILED ***^[[0m
^[[31m  org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 0.0 (TID 6, localhost): java.lang.ClassNotFoundException:
org.apache.spark.rdd.RDD$$anonfun$map$1^[[0m
^[[31m  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)^[[0m
^[[31m  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)^[[0m
^[[31m  at java.security.AccessController.doPrivileged(Native Method)^[[0m
^[[31m  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)^[[0m
^[[31m  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)^[[0m
^[[31m  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)^[[0m
^[[31m  at java.lang.Class.forName0(Native Method)^[[0m
^[[31m  at java.lang.Class.forName(Class.java:270)^[[0m

BTW I didn't find JavaAPISuite in test output either.

Cheers

On Sat, Dec 6, 2014 at 9:12 PM, Koert Kuipers  wrote:

> Ted,
> i mean
> core/src/test/java/org/apache/spark/JavaAPISuite.java
>
> On Sat, Dec 6, 2014 at 9:27 PM, Ted Yu  wrote:
>
>> Pardon me, the test is here:
>>
>> sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
>>
>> You can run 'mvn test' under sql/core
>>
>> Cheers
>>
>> On Sat, Dec 6, 2014 at 5:55 PM, Ted Yu  wrote:
>>
>>> In master branch, I only found JavaAPISuite in comment:
>>>
>>> spark tyu$ find . -name '*.scala' -exec grep JavaAPISuite {} \; -print
>>>* For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
>>>* converted into a `Object` array. For usage example, see test case
>>> JavaAPISuite.testJavaJdbcRDD.
>>> ./core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
>>>
>>> FYI
>>>
>>> On Sat, Dec 6, 2014 at 5:43 PM, Koert Kuipers  wrote:
>>>
 when i run "mvn test -pl core", i dont see JavaAPISuite being run. or
 if it is, its being very very quiet about it. is this by design?

>>>
>>>
>>
>


Convert RDD[Map[String, Any]] to SchemaRDD

2014-12-06 Thread Jianshi Huang
Hi,

What's the best way to convert RDD[Map[String, Any]] to a SchemaRDD?

I'm currently converting each Map to a JSON String and do
JsonRDD.inferSchema.

How about adding inferSchema support to Map[String, Any] directly? It would
be very useful.

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: run JavaAPISuite with mavem

2014-12-06 Thread Michael Armbrust
Not sure about maven, but you can run that test with sbt:

sbt/sbt "sql/test-only org.apache.spark.sql.api.java.JavaAPISuite"

On Sat, Dec 6, 2014 at 9:59 PM, Ted Yu  wrote:

> I tried to run tests for core but there were failures. e.g. :
>
> ^[[32mExternalAppendOnlyMapSuite:^[[0m
> ^[[32m- simple insert^[[0m
> ^[[32m- insert with collision^[[0m
> ^[[32m- ordering^[[0m
> ^[[32m- null keys and values^[[0m
> ^[[32m- simple aggregator^[[0m
> ^[[32m- simple cogroup^[[0m
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> ^[[31m- spilling *** FAILED ***^[[0m
> ^[[31m  org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 0.0 (TID 6, localhost): java.lang.ClassNotFoundException:
> org.apache.spark.rdd.RDD$$anonfun$map$1^[[0m
> ^[[31m  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)^[[0m
> ^[[31m  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)^[[0m
> ^[[31m  at java.security.AccessController.doPrivileged(Native Method)^[[0m
> ^[[31m  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)^[[0m
> ^[[31m  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)^[[0m
> ^[[31m  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)^[[0m
> ^[[31m  at java.lang.Class.forName0(Native Method)^[[0m
> ^[[31m  at java.lang.Class.forName(Class.java:270)^[[0m
>
> BTW I didn't find JavaAPISuite in test output either.
>
> Cheers
>
> On Sat, Dec 6, 2014 at 9:12 PM, Koert Kuipers  wrote:
>
>> Ted,
>> i mean
>> core/src/test/java/org/apache/spark/JavaAPISuite.java
>>
>> On Sat, Dec 6, 2014 at 9:27 PM, Ted Yu  wrote:
>>
>>> Pardon me, the test is here:
>>>
>>> sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
>>>
>>> You can run 'mvn test' under sql/core
>>>
>>> Cheers
>>>
>>> On Sat, Dec 6, 2014 at 5:55 PM, Ted Yu  wrote:
>>>
 In master branch, I only found JavaAPISuite in comment:

 spark tyu$ find . -name '*.scala' -exec grep JavaAPISuite {} \; -print
* For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
* converted into a `Object` array. For usage example, see test case
 JavaAPISuite.testJavaJdbcRDD.
 ./core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

 FYI

 On Sat, Dec 6, 2014 at 5:43 PM, Koert Kuipers 
 wrote:

> when i run "mvn test -pl core", i dont see JavaAPISuite being run. or
> if it is, its being very very quiet about it. is this by design?
>


>>>
>>
>


Re: Convert RDD[Map[String, Any]] to SchemaRDD

2014-12-06 Thread Jianshi Huang
Hmm..

I've created a JIRA: https://issues.apache.org/jira/browse/SPARK-4782

Jianshi

On Sun, Dec 7, 2014 at 2:32 PM, Jianshi Huang 
wrote:

> Hi,
>
> What's the best way to convert RDD[Map[String, Any]] to a SchemaRDD?
>
> I'm currently converting each Map to a JSON String and do
> JsonRDD.inferSchema.
>
> How about adding inferSchema support to Map[String, Any] directly? It
> would be very useful.
>
> Thanks,
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/