Re: Spark SQL configurations

2015-03-26 Thread Akhil Das
If you can share the stacktrace, then we can give your proper guidelines.
For running on YARN, everything is described here:
https://spark.apache.org/docs/latest/running-on-yarn.html

Thanks
Best Regards

On Fri, Mar 27, 2015 at 8:21 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> Hello,
> Can someone share me the list of commands (including export statements)
> that you use to run Spark SQL over YARN cluster. I am unable to get it
> running on my YARN cluster and running into exceptions.
>
> I understand i need to share specific exception. This is more like i want
> to know if i have not missed out anything before running Spark SQL.
> Regards,
> Deepak
>
>


Re: SparkSQL overwrite parquet file does not generate _common_metadata

2015-03-26 Thread Pei-Lun Lee
I'm using 1.0.4

Thanks,
--
Pei-Lun

On Fri, Mar 27, 2015 at 2:32 PM, Cheng Lian  wrote:

>  Hm, which version of Hadoop are you using? Actually there should also be
> a _metadata file together with _common_metadata. I was using Hadoop 2.4.1
> btw. I'm not sure whether Hadoop version matters here, but I did observe
> cases where Spark behaves differently because of semantic differences of
> the same API in different Hadoop versions.
>
> Cheng
>
> On 3/27/15 11:33 AM, Pei-Lun Lee wrote:
>
> Hi Cheng,
>
>  on my computer, execute res0.save("xxx", org.apache.spark.sql.SaveMode.
> Overwrite) produces:
>
>  peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
> total 32
> -rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
> -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
> -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
> -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
> -rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*
>
>  while res0.save("xxx") produces:
>
>  peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
> total 40
> -rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
> -rwxrwxrwx  1 peilunlee  staff  250 Mar 27 11:29 _common_metadata*
> -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
> -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
> -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
> -rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*
>
> On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian  wrote:
>
>>  I couldn’t reproduce this with the following spark-shell snippet:
>>
>> scala> import sqlContext.implicits._
>> scala> Seq((1, 2)).toDF("a", "b")
>> scala> res0.save("xxx", org.apache.spark.sql.SaveMode.Overwrite)
>> scala> res0.save("xxx", org.apache.spark.sql.SaveMode.Overwrite)
>>
>> The _common_metadata file is typically much smaller than _metadata,
>> because it doesn’t contain row group information, and thus can be faster to
>> read than _metadata.
>>
>> Cheng
>>
>> On 3/26/15 12:48 PM, Pei-Lun Lee wrote:
>>
>> Hi,
>>
>>  When I save parquet file with SaveMode.Overwrite, it never generate
>> _common_metadata. Whether it overwrites an existing dir or not.
>> Is this expected behavior?
>> And what is the benefit of _common_metadata? Will reading performs better
>> when it is present?
>>
>>  Thanks,
>> --
>> Pei-Lun
>>
>>  ​
>>
>
>
>


Can spark sql read existing tables created in hive

2015-03-26 Thread ๏̯͡๏
I have few tables that are created in Hive. I wan to transform data stored
in these Hive tables using Spark SQL. Is this even possible ?

So far i have seen that i can create new tables using Spark SQL dialect.
However when i run show tables or do desc hive_table it says table not
found.

I am now wondering is this support present or not in Spark SQL ?

-- 
Deepak


Re: SparkSQL overwrite parquet file does not generate _common_metadata

2015-03-26 Thread Cheng Lian
Hm, which version of Hadoop are you using? Actually there should also be 
a _metadata file together with _common_metadata. I was using Hadoop 
2.4.1 btw. I'm not sure whether Hadoop version matters here, but I did 
observe cases where Spark behaves differently because of semantic 
differences of the same API in different Hadoop versions.


Cheng

On 3/27/15 11:33 AM, Pei-Lun Lee wrote:

Hi Cheng,

on my computer, execute res0.save("xxx", 
org.apache.spark.sql.SaveMode.Overwrite) produces:


peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
total 32
-rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
-rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

while res0.save("xxx") produces:

peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
total 40
-rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
-rwxrwxrwx  1 peilunlee  staff  250 Mar 27 11:29 _common_metadata*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
-rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian > wrote:


I couldn’t reproduce this with the following spark-shell snippet:

|scala> import sqlContext.implicits._
scala> Seq((1, 2)).toDF("a", "b")
scala> res0.save("xxx", org.apache.spark.sql.SaveMode.Overwrite)
scala> res0.save("xxx", org.apache.spark.sql.SaveMode.Overwrite)
|

The _common_metadata file is typically much smaller than
_metadata, because it doesn’t contain row group information, and
thus can be faster to read than _metadata.

Cheng

On 3/26/15 12:48 PM, Pei-Lun Lee wrote:


Hi,

When I save parquet file with SaveMode.Overwrite, it never
generate _common_metadata. Whether it overwrites an existing dir
or not.
Is this expected behavior?
And what is the benefit of _common_metadata? Will reading
performs better when it is present?

Thanks,
--
Pei-Lun

​






Add partition support in saveAsParquet

2015-03-26 Thread Jianshi Huang
Hi,

Anyone has similar request?

https://issues.apache.org/jira/browse/SPARK-6561

When we save a DataFrame into Parquet files, we also want to have it
partitioned.

The proposed API looks like this:

def saveAsParquet(path: String, partitionColumns: Seq[String])



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException

2015-03-26 Thread ๏̯͡๏
Ok.
I modified as per your suggestions

export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export HADOOP_CONF_DIR=/apache/hadoop/conf

cd $SPARK_HOME
./bin/spark-sql -v  --driver-class-path
 
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar


spark-sql prompt . I ran show tables , desc dw_bid. Each throw below
exception.





spark-sql> desc dw_bid;
15/03/26 23:10:14 WARN conf.HiveConf: DEPRECATED: Configuration property
hive.metastore.local no longer has any effect. Make sure to provide a valid
value for hive.metastore.uris if you are connecting to a remote metastore.
15/03/26 23:10:14 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.*
no longer has any effect.  Use hive.hmshandler.retry.* instead
15/03/26 23:10:14 INFO parse.ParseDriver: Parsing command: desc dw_bid
15/03/26 23:10:14 INFO parse.ParseDriver: Parse Completed
15/03/26 23:10:15 INFO metastore.HiveMetaStore: 0: get_table : db=default
tbl=dw_bid
15/03/26 23:10:15 INFO HiveMetaStore.audit: ugi=dvasthi...@corp.ebay.com
ip=unknown-ip-addr cmd=get_table : db=default tbl=dw_bid
15/03/26 23:10:15 INFO spark.SparkContext: Starting job: collect at
SparkPlan.scala:83
15/03/26 23:10:15 INFO scheduler.DAGScheduler: Got job 0 (collect at
SparkPlan.scala:83) with 1 output partitions (allowLocal=false)
15/03/26 23:10:15 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect
at SparkPlan.scala:83)
15/03/26 23:10:15 INFO scheduler.DAGScheduler: Parents of final stage:
List()
15/03/26 23:10:15 INFO scheduler.DAGScheduler: Missing parents: List()
15/03/26 23:10:15 INFO scheduler.DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[1] at map at SparkPlan.scala:83), which has no missing
parents
15/03/26 23:10:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
15/03/26 23:10:16 INFO scheduler.DAGScheduler: Job 0 failed: collect at
SparkPlan.scala:83, took 0.078101 s
15/03/26 23:10:16 ERROR thriftserver.SparkSQLDriver: Failed in [desc dw_bid]
org.apache.spark.SparkException: Job aborted due to stage failure: Task
serialization failed: java.lang.reflect.InvocationTargetException
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:526)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
org.apache.spark.broadcast.TorrentBroadcast.org
$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:79)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)
org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark

Re: SparkContext.wholeTextFiles throws not serializable exception

2015-03-26 Thread Xi Shen
I have to use .lines.toArray.toSeq

A little tricky.




[image: --]
Xi Shen
[image: http://]about.me/davidshen

  

On Fri, Mar 27, 2015 at 4:41 PM, Xi Shen  wrote:

> Hi,
>
> I want to load my data in this way:
>
> sc.wholeTextFiles(opt.input) map { x => (x._1,
> x._2.lines.filter(!_.isEmpty).toSeq) }
>
>
> But I got
>
> java.io.NotSerializableException: scala.collection.Iterator$$anon$13
>
> But if I use "x._2.split('\n')", I can get the expected result. I want to
> know what's wrong with using the "lines()" function.
>
>
> Thanks,
>
> [image: --]
> Xi Shen
> [image: http://]about.me/davidshen
> 
>   
>


SparkContext.wholeTextFiles throws not serializable exception

2015-03-26 Thread Xi Shen
Hi,

I want to load my data in this way:

sc.wholeTextFiles(opt.input) map { x => (x._1,
x._2.lines.filter(!_.isEmpty).toSeq) }


But I got

java.io.NotSerializableException: scala.collection.Iterator$$anon$13

But if I use "x._2.split('\n')", I can get the expected result. I want to
know what's wrong with using the "lines()" function.


Thanks,

[image: --]
Xi Shen
[image: http://]about.me/davidshen

  


Re: How to get a top X percent of a distribution represented as RDD

2015-03-26 Thread Debasish Das
In that case you can directly use count-min-sketch from algebirdthey
work fine with Spark aggregateBy but I have found the java BPQ from Spark
much faster than say algebird Heap datastructure...

On Thu, Mar 26, 2015 at 10:04 PM, Charles Hayden 
wrote:

>  ​You could also consider using a count-min data structure such as in
> https://github.com/laserson/dsq​
>
> to get approximate quantiles, then use whatever values you want to filter
> the original sequence.
>  --
> *From:* Debasish Das 
> *Sent:* Thursday, March 26, 2015 9:45 PM
> *To:* Aung Htet
> *Cc:* user
> *Subject:* Re: How to get a top X percent of a distribution represented
> as RDD
>
>  Idea is to use a heap and get topK elements from every partition...then
> use aggregateBy and for combOp do a merge routine from
> mergeSort...basically get 100 items from partition 1, 100 items from
> partition 2, merge them so that you get sorted 200 items and take 100...for
> merge you can use heap as well...Matei had a BPQ inside Spark which we use
> all the time...Passing arrays over wire is better than passing full heap
> objects and merge routine on array should run faster but needs experiment...
>
> On Thu, Mar 26, 2015 at 9:26 PM, Aung Htet  wrote:
>
>> Hi Debasish,
>>
>> Thanks for your suggestions. In-memory version is quite useful. I do not
>> quite understand how you can use aggregateBy to get 10% top K elements. Can
>> you please give an example?
>>
>> Thanks,
>> Aung
>>
>> On Fri, Mar 27, 2015 at 2:40 PM, Debasish Das 
>> wrote:
>>
>>> You can do it in-memory as wellget 10% topK elements from each
>>> partition and use merge from any sort algorithm like timsortbasically
>>> aggregateBy
>>>
>>>  Your version uses shuffle but this version is 0 shuffle..assuming your
>>> data set is cached you will be using in-memory allReduce through
>>> treeAggregate...
>>>
>>>  But this is only good for top 10% or bottom 10%...if you need to do it
>>> for top 30% then may be the shuffle version will work better...
>>>
>>> On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet  wrote:
>>>
 Hi all,

  I have a distribution represented as an RDD of tuples, in rows of
 (segment, score)
 For each segment, I want to discard tuples with top X percent scores.
 This seems hard to do in Spark RDD.

  A naive algorithm would be -

  1) Sort RDD by segment & score (descending)
 2) Within each segment, number the rows from top to bottom.
 3) For each  segment, calculate the cut off index. i.e. 90 for 10% cut
 off out of a segment with 100 rows.
 4) For the entire RDD, filter rows with row num <= cut off index

 This does not look like a good algorithm. I would really appreciate if
 someone can suggest a better way to implement this in Spark.

  Regards,
 Aung

>>>
>>>
>>
>


Re: How to get a top X percent of a distribution represented as RDD

2015-03-26 Thread Charles Hayden
?You could also consider using a count-min data structure such as in 
https://github.com/laserson/dsq?

to get approximate quantiles, then use whatever values you want to filter the 
original sequence.


From: Debasish Das 
Sent: Thursday, March 26, 2015 9:45 PM
To: Aung Htet
Cc: user
Subject: Re: How to get a top X percent of a distribution represented as RDD

Idea is to use a heap and get topK elements from every partition...then use 
aggregateBy and for combOp do a merge routine from mergeSort...basically get 
100 items from partition 1, 100 items from partition 2, merge them so that you 
get sorted 200 items and take 100...for merge you can use heap as well...Matei 
had a BPQ inside Spark which we use all the time...Passing arrays over wire is 
better than passing full heap objects and merge routine on array should run 
faster but needs experiment...

On Thu, Mar 26, 2015 at 9:26 PM, Aung Htet 
mailto:aung@gmail.com>> wrote:
Hi Debasish,

Thanks for your suggestions. In-memory version is quite useful. I do not quite 
understand how you can use aggregateBy to get 10% top K elements. Can you 
please give an example?

Thanks,
Aung

On Fri, Mar 27, 2015 at 2:40 PM, Debasish Das 
mailto:debasish.da...@gmail.com>> wrote:
You can do it in-memory as wellget 10% topK elements from each partition 
and use merge from any sort algorithm like timsortbasically aggregateBy

Your version uses shuffle but this version is 0 shuffle..assuming your data set 
is cached you will be using in-memory allReduce through treeAggregate...

But this is only good for top 10% or bottom 10%...if you need to do it for top 
30% then may be the shuffle version will work better...

On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet 
mailto:aung@gmail.com>> wrote:
Hi all,

I have a distribution represented as an RDD of tuples, in rows of (segment, 
score)
For each segment, I want to discard tuples with top X percent scores. This 
seems hard to do in Spark RDD.

A naive algorithm would be -

1) Sort RDD by segment & score (descending)
2) Within each segment, number the rows from top to bottom.
3) For each  segment, calculate the cut off index. i.e. 90 for 10% cut off out 
of a segment with 100 rows.
4) For the entire RDD, filter rows with row num <= cut off index

This does not look like a good algorithm. I would really appreciate if someone 
can suggest a better way to implement this in Spark.

Regards,
Aung





Re: How to get a top X percent of a distribution represented as RDD

2015-03-26 Thread Debasish Das
Idea is to use a heap and get topK elements from every partition...then use
aggregateBy and for combOp do a merge routine from mergeSort...basically
get 100 items from partition 1, 100 items from partition 2, merge them so
that you get sorted 200 items and take 100...for merge you can use heap as
well...Matei had a BPQ inside Spark which we use all the time...Passing
arrays over wire is better than passing full heap objects and merge routine
on array should run faster but needs experiment...

On Thu, Mar 26, 2015 at 9:26 PM, Aung Htet  wrote:

> Hi Debasish,
>
> Thanks for your suggestions. In-memory version is quite useful. I do not
> quite understand how you can use aggregateBy to get 10% top K elements. Can
> you please give an example?
>
> Thanks,
> Aung
>
> On Fri, Mar 27, 2015 at 2:40 PM, Debasish Das 
> wrote:
>
>> You can do it in-memory as wellget 10% topK elements from each
>> partition and use merge from any sort algorithm like timsortbasically
>> aggregateBy
>>
>> Your version uses shuffle but this version is 0 shuffle..assuming your
>> data set is cached you will be using in-memory allReduce through
>> treeAggregate...
>>
>> But this is only good for top 10% or bottom 10%...if you need to do it
>> for top 30% then may be the shuffle version will work better...
>>
>> On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet  wrote:
>>
>>> Hi all,
>>>
>>> I have a distribution represented as an RDD of tuples, in rows of
>>> (segment, score)
>>> For each segment, I want to discard tuples with top X percent scores.
>>> This seems hard to do in Spark RDD.
>>>
>>> A naive algorithm would be -
>>>
>>> 1) Sort RDD by segment & score (descending)
>>> 2) Within each segment, number the rows from top to bottom.
>>> 3) For each  segment, calculate the cut off index. i.e. 90 for 10% cut
>>> off out of a segment with 100 rows.
>>> 4) For the entire RDD, filter rows with row num <= cut off index
>>>
>>> This does not look like a good algorithm. I would really appreciate if
>>> someone can suggest a better way to implement this in Spark.
>>>
>>> Regards,
>>> Aung
>>>
>>
>>
>


Re: How to get a top X percent of a distribution represented as RDD

2015-03-26 Thread Aung Htet
Hi Debasish,

Thanks for your suggestions. In-memory version is quite useful. I do not
quite understand how you can use aggregateBy to get 10% top K elements. Can
you please give an example?

Thanks,
Aung

On Fri, Mar 27, 2015 at 2:40 PM, Debasish Das 
wrote:

> You can do it in-memory as wellget 10% topK elements from each
> partition and use merge from any sort algorithm like timsortbasically
> aggregateBy
>
> Your version uses shuffle but this version is 0 shuffle..assuming your
> data set is cached you will be using in-memory allReduce through
> treeAggregate...
>
> But this is only good for top 10% or bottom 10%...if you need to do it for
> top 30% then may be the shuffle version will work better...
>
> On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet  wrote:
>
>> Hi all,
>>
>> I have a distribution represented as an RDD of tuples, in rows of
>> (segment, score)
>> For each segment, I want to discard tuples with top X percent scores.
>> This seems hard to do in Spark RDD.
>>
>> A naive algorithm would be -
>>
>> 1) Sort RDD by segment & score (descending)
>> 2) Within each segment, number the rows from top to bottom.
>> 3) For each  segment, calculate the cut off index. i.e. 90 for 10% cut
>> off out of a segment with 100 rows.
>> 4) For the entire RDD, filter rows with row num <= cut off index
>>
>> This does not look like a good algorithm. I would really appreciate if
>> someone can suggest a better way to implement this in Spark.
>>
>> Regards,
>> Aung
>>
>
>


Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException

2015-03-26 Thread Cheng Lian

Hey Deepak,

It seems that your hive-site.xml says your Hive metastore setup is using 
MySQL. If that's not the case, you need to adjust your hive-site.xml 
configurations. As for the version of MySQL driver, it should match the 
MySQL server.


Cheng

On 3/27/15 11:07 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote:
I do not use MySQL, i want to read Hive tables from Spark SQL and 
transform them in Spark SQL. Why do i need a MySQL driver ? If i still 
need it which version should i use.


Assuming i need it, i downloaded the latest version of it from 
http://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.34 
and ran the following commands, i do not see above exception , however 
i see a new one.






export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export 
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:*/home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar*

export HADOOP_CONF_DIR=/apache/hadoop/conf
cd $SPARK_HOME
./bin/spark-sql
Spark assembly has been built with Hive, including Datanucleus jars on 
classpath

...
...

spark-sql>

spark-sql>

spark-sql>


show tables;

15/03/26 20:03:57 INFO metastore.HiveMetaStore: 0: get_tables: 
db=default pat=.*


15/03/26 20:03:57 INFO HiveMetaStore.audit: 
ugi=dvasthi...@corp.ebay.com 
ip=unknown-ip-addrcmd=get_tables: 
db=default pat=.*


15/03/26 20:03:58 INFO spark.SparkContext: Starting job: collect at 
SparkPlan.scala:83


15/03/26 20:03:58 INFO scheduler.DAGScheduler: Got job 1 (collect at 
SparkPlan.scala:83) with 1 output partitions (allowLocal=false)


15/03/26 20:03:58 INFO scheduler.DAGScheduler: Final stage: Stage 
1(collect at SparkPlan.scala:83)


15/03/26 20:03:58 INFO scheduler.DAGScheduler: Parents of final stage: 
List()


15/03/26 20:03:58 INFO scheduler.DAGScheduler: Missing parents: List()

15/03/26 20:03:58 INFO scheduler.DAGScheduler: Submitting Stage 1 
(MapPartitionsRDD[3] at map at SparkPlan.scala:83), which has no 
missing parents


15/03/26 20:03:58 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1

15/03/26 20:03:58 INFO scheduler.StatsReportListener: Finished stage: 
org.apache.spark.scheduler.StageInfo@2bfd9c4d


15/03/26 20:03:58 INFO scheduler.DAGScheduler: Job 1 failed: collect 
at SparkPlan.scala:83, took 0.005163 s


15/03/26 20:03:58 ERROR thriftserver.SparkSQLDriver: Failed in [show 
tables]


org.apache.spark.SparkException: Job aborted due to stage failure: 
Task serialization failed: java.lang.reflect.InvocationTargetException


sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

java.lang.reflect.Constructor.newInstance(Constructor.java:526)

org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)

org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)

org.apache.spark.broadcast.TorrentBroadcast.org 
$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)


org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:79)

org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)

org.apache.spark.scheduler.DAGScheduler.org 
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)


org.apache.spark.scheduler.DAGScheduler.org 
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)


org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


at org.apache.spark.scheduler.DAGScheduler.org 
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1

Re: Cross-compatibility of YARN shuffle service

2015-03-26 Thread Sandy Ryza
Hi Matt,

I'm not sure whether we have documented compatibility guidelines here.
However, a strong goal is to keep the external shuffle service compatible
so that many versions of Spark can run against the same shuffle service.

-Sandy

On Wed, Mar 25, 2015 at 6:44 PM, Matt Cheah  wrote:

> Hi everyone,
>
> I am considering moving from Spark-Standalone to YARN. The context is that
> there are multiple Spark applications that are using different versions of
> Spark that all want to use the same YARN cluster.
>
> My question is: if I use a single Spark YARN shuffle service jar on the
> Node Manager, will the service work properly with all of the Spark
> applications, regardless of the specific versions of the applications? Or,
> is it it the case that, if I want to use the external shuffle service, I
> need to have all of my applications using the same version of Spark?
>
> Thanks,
>
> -Matt Cheah
>


Re: How to get a top X percent of a distribution represented as RDD

2015-03-26 Thread Debasish Das
You can do it in-memory as wellget 10% topK elements from each
partition and use merge from any sort algorithm like timsortbasically
aggregateBy

Your version uses shuffle but this version is 0 shuffle..assuming your data
set is cached you will be using in-memory allReduce through treeAggregate...

But this is only good for top 10% or bottom 10%...if you need to do it for
top 30% then may be the shuffle version will work better...

On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet  wrote:

> Hi all,
>
> I have a distribution represented as an RDD of tuples, in rows of
> (segment, score)
> For each segment, I want to discard tuples with top X percent scores. This
> seems hard to do in Spark RDD.
>
> A naive algorithm would be -
>
> 1) Sort RDD by segment & score (descending)
> 2) Within each segment, number the rows from top to bottom.
> 3) For each  segment, calculate the cut off index. i.e. 90 for 10% cut off
> out of a segment with 100 rows.
> 4) For the entire RDD, filter rows with row num <= cut off index
>
> This does not look like a good algorithm. I would really appreciate if
> someone can suggest a better way to implement this in Spark.
>
> Regards,
> Aung
>


Re: What is best way to run spark job in "yarn-cluster" mode from java program(servlet container) and NOT using spark-submit command.

2015-03-26 Thread Noorul Islam K M
Sandy Ryza  writes:

> Creating a SparkContext and setting master as yarn-cluster unfortunately
> will not work.
>
> SPARK-4924 added APIs for doing this in Spark, but won't be included until
> 1.4.
>
> -Sandy
>

Did you look into something like [1]? With that you can make rest API
call from your java code.

Thanks and Regards
Noorul

[1] https://github.com/spark-jobserver/spark-jobserver?


> On Tue, Mar 17, 2015 at 3:19 AM, Akhil Das 
> wrote:
>
>> Create SparkContext set master as yarn-cluster then run it as a standalone
>> program?
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Mar 17, 2015 at 1:27 AM, rrussell25  wrote:
>>
>>> Hi, were you ever able to determine a satisfactory approach for this
>>> problem?
>>> I have a similar situation and would prefer to execute the job directly
>>> from
>>> java code within my jms listener and/or servlet container.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-best-way-to-run-spark-job-in-yarn-cluster-mode-from-java-program-servlet-container-and-NOT-u-tp21817p22086.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL overwrite parquet file does not generate _common_metadata

2015-03-26 Thread Pei-Lun Lee
Hi Cheng,

on my computer, execute res0.save("xxx", org.apache.spark.sql.SaveMode.
Overwrite) produces:

peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
total 32
-rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
-rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

while res0.save("xxx") produces:

peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
total 40
-rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
-rwxrwxrwx  1 peilunlee  staff  250 Mar 27 11:29 _common_metadata*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
-rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian  wrote:

>  I couldn’t reproduce this with the following spark-shell snippet:
>
> scala> import sqlContext.implicits._
> scala> Seq((1, 2)).toDF("a", "b")
> scala> res0.save("xxx", org.apache.spark.sql.SaveMode.Overwrite)
> scala> res0.save("xxx", org.apache.spark.sql.SaveMode.Overwrite)
>
> The _common_metadata file is typically much smaller than _metadata,
> because it doesn’t contain row group information, and thus can be faster to
> read than _metadata.
>
> Cheng
>
> On 3/26/15 12:48 PM, Pei-Lun Lee wrote:
>
>   Hi,
>
>  When I save parquet file with SaveMode.Overwrite, it never generate
> _common_metadata. Whether it overwrites an existing dir or not.
> Is this expected behavior?
> And what is the benefit of _common_metadata? Will reading performs better
> when it is present?
>
>  Thanks,
> --
> Pei-Lun
>
>   ​
>


How to get a top X percent of a distribution represented as RDD

2015-03-26 Thread Aung Htet
Hi all,

I have a distribution represented as an RDD of tuples, in rows of (segment,
score)
For each segment, I want to discard tuples with top X percent scores. This
seems hard to do in Spark RDD.

A naive algorithm would be -

1) Sort RDD by segment & score (descending)
2) Within each segment, number the rows from top to bottom.
3) For each  segment, calculate the cut off index. i.e. 90 for 10% cut off
out of a segment with 100 rows.
4) For the entire RDD, filter rows with row num <= cut off index

This does not look like a good algorithm. I would really appreciate if
someone can suggest a better way to implement this in Spark.

Regards,
Aung


Re: Combining Many RDDs

2015-03-26 Thread Noorul Islam K M
Yang Chen  writes:

> Hi Noorul,
>
> Thank you for your suggestion. I tried that, but ran out of memory. I did
> some search and found some suggestions
> that we should try to avoid rdd.union(
> http://stackoverflow.com/questions/28343181/memory-efficient-way-of-union-a-sequence-of-rdds-from-files-in-apache-spark
> ).
> I will try to come up with some other ways.
>

I think you are using rdd.union(), but I was referring to
SparkContext.union(). I am not sure about the number of RDDs that you
have but I had no issues with memory when I used it to combine 2000
RDDs. Having said that I had other performance issues with spark
cassandra connector.

Thanks and Regards
Noorul

>
> On Thu, Mar 26, 2015 at 1:13 PM, Noorul Islam K M  wrote:
>
>> sparkx  writes:
>>
>> > Hi,
>> >
>> > I have a Spark job and a dataset of 0.5 Million items. Each item performs
>> > some sort of computation (joining a shared external dataset, if that does
>> > matter) and produces an RDD containing 20-500 result items. Now I would
>> like
>> > to combine all these RDDs and perform a next job. What I have found out
>> is
>> > that the computation itself is quite fast, but combining these RDDs takes
>> > much longer time.
>> >
>> > val result = data// 0.5M data items
>> >   .map(compute(_))   // Produces an RDD - fast
>> >   .reduce(_ ++ _)  // Combining RDDs - slow
>> >
>> > I have also tried to collect results from compute(_) and use a flatMap,
>> but
>> > that is also slow.
>> >
>> > Is there a way to efficiently do this? I'm thinking about writing this
>> > result to HDFS and reading from disk for the next job, but am not sure if
>> > that's a preferred way in Spark.
>> >
>>
>> Are you looking for SparkContext.union() [1] ?
>>
>> This is not performing well with spark cassandra connector. I am not
>> sure whether this will help you.
>>
>> Thanks and Regards
>> Noorul
>>
>> [1]
>> http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException

2015-03-26 Thread Denny Lee
If you're not using MySQL as your metastore for Hive, out of curiosity what
are you using?

The error you are seeing is common when there isn't the correct driver to
allow Spark to connect to the Hive metastore because the correct driver
isn't there.

As well, I noticed that you're using SPARK_CLASSPATH which has been
deprecated.  Depending on your scenario, you may want to use --jars,
--driver-class-path, or extraClassPath.  A good thread on this topic can be
found at
http://mail-archives.us.apache.org/mod_mbox/spark-user/201503.mbox/%3C01a901d0547c$a23ba480$e6b2ed80$@innowireless.com%3E
.

For example, when I connect to my own Hive metastore via Spark 1.3, I
reference the --driver-class-path where in my case I am using MySQL as my
Hive metastore:

./bin/spark-sql --master spark://$standalone$:7077 --driver-class-path
mysql-connector-$version$.jar

HTH!


On Thu, Mar 26, 2015 at 8:09 PM ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> I do not use MySQL, i want to read Hive tables from Spark SQL and
> transform them in Spark SQL. Why do i need a MySQL driver ? If i still need
> it which version should i use.
>
> Assuming i need it, i downloaded the latest version of it from
> http://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.34 and
> ran the following commands, i do not see above exception , however i see a
> new one.
>
>
>
>
>
> export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
> export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
> export
> SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:
> */home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar*
> export HADOOP_CONF_DIR=/apache/hadoop/conf
> cd $SPARK_HOME
> ./bin/spark-sql
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> ...
> ...
>
> spark-sql>
>
> spark-sql>
>
> spark-sql>
>
>
> show tables;
>
> 15/03/26 20:03:57 INFO metastore.HiveMetaStore: 0: get_tables: db=default
> pat=.*
>
> 15/03/26 20:03:57 INFO HiveMetaStore.audit: ugi=dvasthi...@corp.ebay.com
> ip=unknown-ip-addr cmd=get_tables: db=default pat=.*
>
> 15/03/26 20:03:58 INFO spark.SparkContext: Starting job: collect at
> SparkPlan.scala:83
>
> 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Got job 1 (collect at
> SparkPlan.scala:83) with 1 output partitions (allowLocal=false)
>
> 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Final stage: Stage
> 1(collect at SparkPlan.scala:83)
>
> 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
>
> 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Missing parents: List()
>
> 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Submitting Stage 1
> (MapPartitionsRDD[3] at map at SparkPlan.scala:83), which has no missing
> parents
>
> 15/03/26 20:03:58 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1
>
> 15/03/26 20:03:58 INFO scheduler.StatsReportListener: Finished stage:
> org.apache.spark.scheduler.StageInfo@2bfd9c4d
>
> 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Job 1 failed: collect at
> SparkPlan.scala:83, took 0.005163 s
>
> 15/03/26 20:03:58 ERROR thriftserver.SparkSQLDriver: Failed in [show
> tables]
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> serialization failed: java.lang.reflect.InvocationTargetException
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
> java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>
>
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>
>
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>
> org.apache.spark.broadcast.TorrentBroadcast.org
> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>
>
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:79)
>
>
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>
>
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
>
>
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
>
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)
>
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$sc

Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException

2015-03-26 Thread ๏̯͡๏
I do not use MySQL, i want to read Hive tables from Spark SQL and transform
them in Spark SQL. Why do i need a MySQL driver ? If i still need it which
version should i use.

Assuming i need it, i downloaded the latest version of it from
http://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.34 and ran
the following commands, i do not see above exception , however i see a new
one.





export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:
*/home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar*
export HADOOP_CONF_DIR=/apache/hadoop/conf
cd $SPARK_HOME
./bin/spark-sql
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
...
...

spark-sql>

spark-sql>

spark-sql>


show tables;

15/03/26 20:03:57 INFO metastore.HiveMetaStore: 0: get_tables: db=default
pat=.*

15/03/26 20:03:57 INFO HiveMetaStore.audit: ugi=dvasthi...@corp.ebay.com
ip=unknown-ip-addr cmd=get_tables: db=default pat=.*

15/03/26 20:03:58 INFO spark.SparkContext: Starting job: collect at
SparkPlan.scala:83

15/03/26 20:03:58 INFO scheduler.DAGScheduler: Got job 1 (collect at
SparkPlan.scala:83) with 1 output partitions (allowLocal=false)

15/03/26 20:03:58 INFO scheduler.DAGScheduler: Final stage: Stage 1(collect
at SparkPlan.scala:83)

15/03/26 20:03:58 INFO scheduler.DAGScheduler: Parents of final stage:
List()

15/03/26 20:03:58 INFO scheduler.DAGScheduler: Missing parents: List()

15/03/26 20:03:58 INFO scheduler.DAGScheduler: Submitting Stage 1
(MapPartitionsRDD[3] at map at SparkPlan.scala:83), which has no missing
parents

15/03/26 20:03:58 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1

15/03/26 20:03:58 INFO scheduler.StatsReportListener: Finished stage:
org.apache.spark.scheduler.StageInfo@2bfd9c4d

15/03/26 20:03:58 INFO scheduler.DAGScheduler: Job 1 failed: collect at
SparkPlan.scala:83, took 0.005163 s

15/03/26 20:03:58 ERROR thriftserver.SparkSQLDriver: Failed in [show tables]

org.apache.spark.SparkException: Job aborted due to stage failure: Task
serialization failed: java.lang.reflect.InvocationTargetException

sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

java.lang.reflect.Constructor.newInstance(Constructor.java:526)

org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)

org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)

org.apache.spark.broadcast.TorrentBroadcast.org
$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)

org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:79)

org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)

org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)

org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)

org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847)

at org.apache.spark

k-means can only run on one executor with one thread?

2015-03-26 Thread Xi Shen
Hi,

I have a large data set, and I expects to get 5000 clusters.

I load the raw data, convert them into DenseVector; then I did repartition
and cache; finally I give the RDD[Vector] to KMeans.train().

Now the job is running, and data are loaded. But according to the Spark UI,
all data are loaded onto one executor. I checked that executor, and its CPU
workload is very low. I think it is using only 1 of the 8 cores. And all
other 3 executors are at rest.

Did I miss something? Is it possible to distribute the workload to all 4
executors?


Thanks,
David


Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException

2015-03-26 Thread Cheng Lian
As the exception suggests, you don't have MySQL JDBC driver on your 
classpath.



On 3/27/15 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote:

I am unable to run spark-sql form command line.  I attempted the following

1)

export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export 
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar

cd $SPARK_HOME

./bin/spark-sql

./bin/spark-sql
2)

export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export 
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar

cd $SPARK_HOME

./bin/spark-sql --jars 
/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar



3)

export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export 
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar

export HADOOP_CONF_DIR=/apache/hadoop/conf
cd $SPARK_HOME
./bin/spark-sql



_Each time i get the below exception_


Spark assembly has been built with Hive, including Datanucleus jars on 
classpath
15/03/26 19:43:49 WARN conf.HiveConf: DEPRECATED: Configuration 
property hive.metastore.local no longer has any effect. Make sure to 
provide a valid value for hive.metastore.uris if you are connecting to 
a remote metastore.
15/03/26 19:43:49 WARN conf.HiveConf: DEPRECATED: 
hive.metastore.ds.retry.* no longer has any effect.  Use 
hive.hmshandler.retry.* instead
15/03/26 19:43:49 INFO metastore.HiveMetaStore: 0: Opening raw store 
with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/03/26 19:43:49 INFO metastore.ObjectStore: ObjectStore, initialize 
called
15/03/26 19:43:50 INFO DataNucleus.Persistence: Property 
datanucleus.cache.level2 unknown - will be ignored
15/03/26 19:43:50 INFO DataNucleus.Persistence: Property 
hive.metastore.integral.jdo.pushdown unknown - will be ignored
Exception in thread "main" java.lang.RuntimeException: 
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:62)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at 
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)

at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)

... 11 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newIns

Spark SQL configurations

2015-03-26 Thread ๏̯͡๏
Hello,
Can someone share me the list of commands (including export statements)
that you use to run Spark SQL over YARN cluster. I am unable to get it
running on my YARN cluster and running into exceptions.

I understand i need to share specific exception. This is more like i want
to know if i have not missed out anything before running Spark SQL.
Regards,
Deepak


spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException

2015-03-26 Thread ๏̯͡๏
I am unable to run spark-sql form command line.  I attempted the following

1)

export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar
cd $SPARK_HOME

./bin/spark-sql

./bin/spark-sql
2)

export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar
cd $SPARK_HOME

./bin/spark-sql --jars
/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar


3)

export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar
export
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
export HADOOP_CONF_DIR=/apache/hadoop/conf
cd $SPARK_HOME
./bin/spark-sql



*Each time i get the below exception*


Spark assembly has been built with Hive, including Datanucleus jars on
classpath
15/03/26 19:43:49 WARN conf.HiveConf: DEPRECATED: Configuration property
hive.metastore.local no longer has any effect. Make sure to provide a valid
value for hive.metastore.uris if you are connecting to a remote metastore.
15/03/26 19:43:49 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.*
no longer has any effect.  Use hive.hmshandler.retry.* instead
15/03/26 19:43:49 INFO metastore.HiveMetaStore: 0: Opening raw store with
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/03/26 19:43:49 INFO metastore.ObjectStore: ObjectStore, initialize called
15/03/26 19:43:50 INFO DataNucleus.Persistence: Property
datanucleus.cache.level2 unknown - will be ignored
15/03/26 19:43:50 INFO DataNucleus.Persistence: Property
hive.metastore.integral.jdo.pushdown unknown - will be ignored
Exception in thread "main" java.lang.RuntimeException:
java.lang.RuntimeException: Unable to instantiate
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:62)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
... 11 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.hive.metastore.MetaStoreUtil

Re: HQL function Rollup and Cube

2015-03-26 Thread ๏̯͡๏
Did you manage to connect to Hive metastore from Spark SQL. I copied hive
conf file into Spark conf folder but when i run show tables, or do select *
from dw_bid (dw_bid is stored in Hive) it says table not found.



On Thu, Mar 26, 2015 at 11:43 PM, Chang Lim  wrote:

> Solved.  In IDE, project settings was missing the dependent lib jars (jar
> files under spark-xx/lib). When theses jar is not set, I got class not
> found
> error about datanucleus classes (compared to an out of memory error in
> Spark
> Shell).
>
> In the context of Spark Shell, these dependent jars needs to be passed in
> at
> the spark-shell command line.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HQL-function-Rollup-and-Cube-tp22241p22246.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Deepak


Re: Missing an output location for shuffle. : (

2015-03-26 Thread 李铖
Here is the worker track.

15/03/26 16:05:47 INFO Worker: Asked to kill executor
app-20150326160534-0005/1
15/03/26 16:05:47 INFO ExecutorRunner: Runner thread for executor
app-20150326160534-0005/1 interrupted
15/03/26 16:05:47 INFO ExecutorRunner: Killing process!
15/03/26 16:05:47 ERROR FileAppender: Error writing stream to file
/opt/cloud/spark/work/app-20150326160534-0005/1/stderr
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.FilterInputStream.read(FilterInputStream.java:107)
at
org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
at
org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
15/03/26 16:05:48 INFO Worker: Executor app-20150326160534-0005/1 finished
with state KILLED exitStatus 1
15/03/26 16:05:48 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@cloud1:37365] has failed, ad
dress is now gated for [5000] ms. Reason is: [Disassociated].


2015-03-26 23:01 GMT+08:00 Michael Armbrust :

> I would suggest looking for errors in the logs of your executors.
>
> On Thu, Mar 26, 2015 at 3:20 AM, 李铖  wrote:
>
>> Again,when I do larger file Spark-sql query, error occured.Anyone have
>> got fix it .Please help me.
>> Here is the track.
>>
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
>> location for shuffle 0
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
>> at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
>> at
>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>> at
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:120)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242)
>> at
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
>> at
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
>> at
>> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)
>>
>> )
>>
>>
>


Re: WordCount example

2015-03-26 Thread Saisai Shao
Hi,

Did you run the word count example in Spark local mode or other mode, in
local mode you have to set Local[n], where n >=2. For other mode, make sure
available cores larger than 1. Because the receiver inside Spark Streaming
wraps as a long-running task, which will at least occupy one core.

Besides using lsof -p  or netstat to make sure Spark executor backend
is connected to the nc process. Also grep the executor's log to see if
there's log like "Connecting to  " and "Connected to 
" which shows that receiver is correctly connected to nc process.

Thanks
Jerry

2015-03-27 8:45 GMT+08:00 Mohit Anchlia :

> What's the best way to troubleshoot inside spark to see why Spark is not
> connecting to nc on port ? I don't see any errors either.
>
> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia 
> wrote:
>
>> I am trying to run the word count example but for some reason it's not
>> working as expected. I start "nc" server on port  and then submit the
>> spark job to the cluster. Spark job gets successfully submitting but I
>> never see any connection from spark getting established. I also tried to
>> type words on the console where "nc" is listening and waiting on the
>> prompt, however I don't see any output. I also don't see any errors.
>>
>> Here is the conf:
>>
>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>> "NetworkWordCount");
>>
>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
>> *seconds*(1));
>>
>> JavaReceiverInputDStream lines = jssc.socketTextStream(
>> "localhost", );
>>
>
>


Re: Missing an output location for shuffle. : (

2015-03-26 Thread 李铖
Here is the work track:

172.100.8.2600:01:40:01:32:d11020小时1分43秒172.100.8.3300:01:40:01:34:a6200152152020小时1分49秒172.100.8.3700:01:40:01:34:a81001680020小时1分52秒172.100.8.5800:01:40:01:34:f5100840020小时1分53秒172.100.8.7900:01:40:01:2a:242020小时1分53秒172.100.8.8100:01:40:01:1c:0c10016884020小时1分53秒172.100.8.8200:01:40:01:37:35100228152020小时1分51秒172.100.8.8300:01:40:01:0d:542020小时1分53秒172.100.8.8400:e1:40:00:01:6e2020小时1分53秒172.100.8.8500:01:40:01:35:22200228228020小时1分49秒172.100.8.8600:e1:40:00:01:70100480312020小时1分53秒172.100.8.8700:01:40:01:34:0f1001680020小时1分52秒172.100.8.10100:01:40:01:34:a5100564320020小时1分52秒172.100.8.10200:01:40:01:33:f02020小时1分42秒172.100.8.10300:01:40:01:22:58100252168020小时1分53秒172.100.8.10500:01:40:01:1c:cc2020小时1分53秒172.100.8.11400:01:40:01:35:20200228152020小时1分49秒172.100.8.15600:01:40:01:1c:b420032884020小时1分53秒172.100.8.15700:01:40:01:09:a81020小时1分53秒172.100.8.15800:01:40:01:1b:7c20016884020小时1分53秒172.100.8.16700:01:40:01:34:0d10025284020小时1分52秒172.100.8.20284:8e:0c:2b:dc:fc30013.86MB320.52MB41小时46分10秒172.100.8.222d0:22:be:53:fe:4b446099KB9.99MB154.34MB43小时17分0秒172.100.9.318:dc:56:7c:14:4d160054.91MB1.93GB414小时13分36秒172.100.9.4000:01:40:01:32:e02020小时1分44秒172.100.9.4100:01:40:01:38:8e2020小时1分35秒172.100.9.4200:01:40:01:38:291001680020小时1分52秒172.100.9.4300:01:40:01:34:cc1005040020小时1分44秒172.100.9.4400:01:40:01:36:202005040020小时1分36秒172.100.9.4500:01:40:01:32:7f1020小时1分42秒172.100.9.4600:01:40:01:32:7e1020小时1分43秒172.100.9.4700:01:40:01:35:1c2020小时1分44秒172.100.9.4800:01:40:01:34:f11001KB0020小时1分44秒172.100.9.4900:01:40:01:32:3b2020小时1分52秒172.100.9.5000:01:40:01:35:122001KB0020小时1分42秒172.100.9.5100:01:40:01:35:882020小时1分44秒172.100.9.5200:01:40:01:35:991005040020小时1分40秒172.100.9.5300:e1:40:12:00:73200840020小时1分53秒172.100.9.5400:01:40:01:36:2210010080020小时1分44秒172.100.9.5600:01:40:01:37:7a100304304020小时1分52秒172.100.9.5700:01:40:01:2f:892020小时1分49秒172.100.9.5800:01:40:01:32:9b2005040020小时1分44秒172.100.9.5900:01:40:01:36:211005040020小时1分44秒172.100.9.6100:01:40:01:3b:be2020小时1分43秒172.100.9.6200:01:40:01:30:fe2020小时1分43秒172.100.9.6300:01:40:01:35:101020小时1分44秒172.100.9.6400:01:40:01:32:821020小时1分37秒172.100.9.6500:01:40:01:34:cf2020小时1分44秒172.100.9.6600:01:40:01:36:1e2001KB0020小时1分42秒172.100.9.6700:01:40:01:32:d51020小时1分44秒172.100.9.6800:01:40:01:32:9720010080020小时1分44秒172.100.9.6900:01:40:01:35:331020小时1分42秒172.100.9.7000:01:40:01:35:962001KB0020小时1分44秒172.100.9.7100:01:40:01:32:991020小时1分42秒172.100.9.7200:01:40:01:36:2e1020小时1分44秒172.100.9.7300:01:40:01:35:9b28406720020小时1分44秒172.100.9.7400:01:40:01:34:f01020小时1分39秒172.100.9.7500:01:40:01:35:0f1020小时1分43秒172.100.9.7600:01:40:01:33:141020小时1分39秒172.100.9.7700:01:40:01:31:5c1020小时1分43秒172.100.9.7800:01:40:01:35:891020小时1分43秒172.100.9.7900:01:40:01:38:8a10025284020小时1分52秒172.100.9.8000:01:40:01:32:7d1020小时1分44秒172.100.9.8100:01:40:01:32:021020小时1分44秒172.100.9.8200:01:40:01:34:ee10010080020小时1分43秒172.100.9.8300:01:40:01:30:2b1005040020小时1分41秒172.100.9.8500:01:40:01:32:651020小时1分45秒172.100.9.8600:01:40:01:32:41100760020小时1分52秒172.100.9.8700:01:40:01:36:271001KB0020小时1分45秒172.100.9.8800:01:40:01:32:042005040020小时1分38秒172.100.9.8900:01:40:01:30:f02005040020小时1分44秒172.100.9.9000:01:40:01:35:022020小时1分44秒172.100.9.9100:01:40:01:34:a4100572244020小时1分52秒172.100.9.9300:01:40:01:35:132020小时1分43秒172.100.9.9400:01:40:01:36:1c2005040020小时1分43秒172.100.9.9500:01:40:01:33:181020小时1分32秒172.100.9.9600:01:40:01:35:942005040020小时1分44秒172.100.9.9700:01:40:01:30:f310010080020小时1分44秒172.100.9.9800:01:40:01:31:d81005040020小时1分44秒172.100.9.9900:01:40:01:35:001020小时1分44秒172.100.9.10100:01:40:01:34:ed2005040020小时1分44秒172.100.9.10200:01:40:01:34:aa2020小时1分44秒172.100.9.10300:01:40:01:35:821005040020小时1分42秒172.100.9.10400:01:40:01:36:2d2005040020小时1分41秒172.100.9.10500:01:40:01:35:9320010080020小时1分43秒172.100.9.10600:01:40:01:35:7e1001KB0020小时1分44秒172.100.9.10700:01:40:01:39:c020010080020小时1分44秒172.100.9.10800:01:40:01:35:7f2001KB0020小时1分43秒172.100.9.10900:01:40:01:34:211001KB0020小时1分42秒172.100.9.11000:01:40:01:32:722020小时1分43秒172.100.9.11100:01:40:01:31:e530010080020小时1分41秒172.100.9.11200:01:40:01:32:592020小时1分44秒172.100.9.11300:01:40:01:36:242005040020小时1分43秒172.100.9.11400:01:40:01:35:862020小时1分44秒172.100.9.11500:01:40:01:35:0b30010080020小时1分41秒172.100.9.11600:01:40:01:34:cb2020小时1分42秒172.100.9.11700:01:40:01:32:d61020小时1分42秒172.100.9.11800:01:40:01:34:ac2005040020小时1分38秒172.100.9.11900:01:40:01:35:071005040020小时1分44秒172.100.9.12000:01:40:01:33:0e2020小时1分39秒172.100.9.12100:01:40:01:34:f81020小时1分44秒172.100.9.12200:01:40:01:35:982020小时1分41秒172.100.9.12300:01:40:01:35:171020小时1分39秒172.100.9.12400:01:40:01:30:001020小时1分44秒172.100.9.12500:01:40:01:35:7628401680020小时1分43秒172.100.9.12600:01:40:01:35:302020小时1分45秒172.100.9.12700:01:40:01:32:e22020小时1分44秒172.100.9.12800:01:40:01:33:1d2020小时1分44秒17

FetchFailedException during shuffle

2015-03-26 Thread Chen Song
Using spark 1.3.0 on cdh5.1.0, I was running a fetch failed exception.

I searched in this email list but not found anything like this reported.
What could be the reason for the error?

org.apache.spark.shuffle.FetchFailedException: [EMPTY_INPUT] Cannot
decompress empty stream
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTa

Re: What is best way to run spark job in "yarn-cluster" mode from java program(servlet container) and NOT using spark-submit command.

2015-03-26 Thread Sandy Ryza
Creating a SparkContext and setting master as yarn-cluster unfortunately
will not work.

SPARK-4924 added APIs for doing this in Spark, but won't be included until
1.4.

-Sandy

On Tue, Mar 17, 2015 at 3:19 AM, Akhil Das 
wrote:

> Create SparkContext set master as yarn-cluster then run it as a standalone
> program?
>
> Thanks
> Best Regards
>
> On Tue, Mar 17, 2015 at 1:27 AM, rrussell25  wrote:
>
>> Hi, were you ever able to determine a satisfactory approach for this
>> problem?
>> I have a similar situation and would prefer to execute the job directly
>> from
>> java code within my jms listener and/or servlet container.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-best-way-to-run-spark-job-in-yarn-cluster-mode-from-java-program-servlet-container-and-NOT-u-tp21817p22086.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: shuffle write size

2015-03-26 Thread Chen Song
Anyone can shed some light on this?

On Tue, Mar 17, 2015 at 5:23 PM, Chen Song  wrote:

> I have a map reduce job that reads from three logs and joins them on some
> key column. The underlying data is protobuf messages in sequence
> files. Between mappers and reducers, the underlying raw byte arrays for
> protobuf messages are shuffled . Roughly, for 1G input from HDFS, there is
> 2G data output from map phase.
>
> I am testing spark jobs (v1.3.0) on the same input. I found that shuffle
> write is 3 - 4 times input size. I tried passing protobuf Message object
> and ArrayByte but neither gives good shuffle write output.
>
> Is there any good practice on shuffling
>
> * protobuf messages
> * raw byte array
>
> Chen
>
>


-- 
Chen Song


Difference behaviour of DateType in SparkSQL between 1.2 and 1.3

2015-03-26 Thread Wush Wu
Dear all,

I am trying to upgrade the spark from 1.2 to 1.3 and switch the existed API
of creating SchemaRDD to DataFrame.

After testing, I notice that the following behavior is changed:

```
import java.sql.Date
import com.bridgewell.SparkTestUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{DataType, DateType}
import org.junit.runner.RunWith
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

case class TestDateClass(x : Date)

@RunWith(classOf[JUnitRunner])
class DataFrameDateSuite extends SparkTestUtils with ShouldMatchers {

  sparkTest("Validate Date") {


def test[TestClass <: Product : ClassTag : TypeTag, T :
ClassTag](params : Seq[T], tp : DataType, f : (T => TestClass)): Unit = {

  val hc = new HiveContext(sc)
  val rdd : RDD[TestClass] = sc.parallelize(params).map(f(_))
  hc.createDataFrame(rdd).registerTempTable("test")
  // code that works for spark-1.2
  // hc.registerRDDTable(hc.createSchemaRDD(rdd), "test")
  val row = hc.sql("SELECT * FROM test").first
  row(0).asInstanceOf[Date] shouldEqual params.head
}

test[TestDateClass, Date](Array(new Date(86400)), DateType, new
TestDateClass(_))

  }

}
```

The above test is passed in spark 1.2 but failed now. If i print the
getTime() from the two date object(before saving to Hive and after loading
from Hive), the value are 86400 and -2880.

Do I misuse the API? Or is this a bug?

Best,
Wush


Re: Why k-means cluster hang for a long time?

2015-03-26 Thread Xi Shen
Hi Burak,

After I added .repartition(sc.defaultParallelism), I can see from the log
the partition number is set to 32. But in the Spark UI, it seems all the
data are loaded onto one executor. Previously they were loaded onto 4
executors.

Any idea?


Thanks,
David


On Fri, Mar 27, 2015 at 11:01 AM Xi Shen  wrote:

> How do I get the number of cores that I specified at the command line? I
> want to use "spark.default.parallelism". I have 4 executors, each has 8
> cores. According to
> https://spark.apache.org/docs/1.2.0/configuration.html#execution-behavior,
> the "spark.default.parallelism" value will be 4 * 8 = 32...I think it is
> too large, or inappropriate. Please give some suggestion.
>
> I have already used cache, and count to pre-cache.
>
> I can try with smaller k for testing, but eventually I will have to use k
> = 5000 or even large. Because I estimate our data set would have that much
> of clusters.
>
>
> Thanks,
> David
>
>
> On Fri, Mar 27, 2015 at 10:40 AM Burak Yavuz  wrote:
>
>> Hi David,
>> The number of centroids (k=5000) seems too large and is probably the
>> cause of the code taking too long.
>>
>> Can you please try the following:
>> 1) Repartition data to the number of available cores with
>> .repartition(numCores)
>> 2) cache data
>> 3) call .count() on data right before k-means
>> 4) try k=500 (even less if possible)
>>
>> Thanks,
>> Burak
>>
>> On Mar 26, 2015 4:15 PM, "Xi Shen"  wrote:
>> >
>> > The code is very simple.
>> >
>> > val data = sc.textFile("very/large/text/file") map { l =>
>> >   // turn each line into dense vector
>> >   Vectors.dense(...)
>> > }
>> >
>> > // the resulting data set is about 40k vectors
>> >
>> > KMeans.train(data, k=5000, maxIterations=500)
>> >
>> > I just kill my application. In the log I found this:
>> >
>> > 15/03/26 11:42:43 INFO storage.BlockManagerMaster: Updated info of
>> block broadcast_26_piece0
>> > 15/03/26 23:02:57 WARN server.TransportChannelHandler: Exception in
>> connection from workernode0.xshe3539-hadoop-sydney.q10.internal.cloudapp.
>> net/100.72.84.107:56277
>> > java.io.IOException: An existing connection was forcibly closed by the
>> remote host
>> >
>> > Notice the time gap. I think it means the work node did not generate
>> any log at all for about 12hrs...does it mean they are not working at all?
>> >
>> > But when testing with very small data set, my application works and
>> output expected data.
>> >
>> >
>> > Thanks,
>> > David
>> >
>> >
>> > On Fri, Mar 27, 2015 at 10:04 AM Burak Yavuz  wrote:
>> >>
>> >> Can you share the code snippet of how you call k-means? Do you cache
>> the data before k-means? Did you repartition the data?
>> >>
>> >> On Mar 26, 2015 4:02 PM, "Xi Shen"  wrote:
>> >>>
>> >>> OH, the job I talked about has ran more than 11 hrs without a
>> result...it doesn't make sense.
>> >>>
>> >>>
>> >>> On Fri, Mar 27, 2015 at 9:48 AM Xi Shen 
>> wrote:
>> 
>>  Hi Burak,
>> 
>>  My iterations is set to 500. But I think it should also stop of the
>> centroid coverages, right?
>> 
>>  My spark is 1.2.0, working in windows 64 bit. My data set is about
>> 40k vectors, each vector has about 300 features, all normalised. All work
>> node have sufficient memory and disk space.
>> 
>>  Thanks,
>>  David
>> 
>> 
>>  On Fri, 27 Mar 2015 02:48 Burak Yavuz  wrote:
>> >
>> > Hi David,
>> >
>> > When the number of runs are large and the data is not properly
>> partitioned, it seems that K-Means is hanging according to my experience.
>> Especially setting the number of runs to something high drastically
>> increases the work in executors. If that's not the case, can you give more
>> info on what Spark version you are using, your setup, and your dataset?
>> >
>> > Thanks,
>> > Burak
>> >
>> > On Mar 26, 2015 5:10 AM, "Xi Shen"  wrote:
>> >>
>> >> Hi,
>> >>
>> >> When I run k-means cluster with Spark, I got this in the last two
>> lines in the log:
>> >>
>> >> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned broadcast 26
>> >> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned shuffle 5
>> >>
>> >>
>> >>
>> >> Then it hangs for a long time. There's no active job. The driver
>> machine is idle. I cannot access the work node, I am not sure if they are
>> busy.
>> >>
>> >> I understand k-means may take a long time to finish. But why no
>> active job? no log?
>> >>
>> >>
>> >> Thanks,
>> >> David
>> >>
>>
>


Re: WordCount example

2015-03-26 Thread Mohit Anchlia
What's the best way to troubleshoot inside spark to see why Spark is not
connecting to nc on port ? I don't see any errors either.

On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia 
wrote:

> I am trying to run the word count example but for some reason it's not
> working as expected. I start "nc" server on port  and then submit the
> spark job to the cluster. Spark job gets successfully submitting but I
> never see any connection from spark getting established. I also tried to
> type words on the console where "nc" is listening and waiting on the
> prompt, however I don't see any output. I also don't see any errors.
>
> Here is the conf:
>
> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
> "NetworkWordCount");
>
> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
> *seconds*(1));
>
> JavaReceiverInputDStream lines = jssc.socketTextStream("localhost",
> );
>


Re: Spark History Server : jobs link doesn't open

2015-03-26 Thread , Roy
in log I found this

2015-03-26 19:42:09,531 WARN org.eclipse.jetty.servlet.ServletHandler:
Error for /history/application_1425934191900_87572
org.spark-project.guava.common.util.concurrent.ExecutionError:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.spark-project.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2261)
at 
org.spark-project.guava.common.cache.LocalCache.get(LocalCache.java:4000)


thanks

On Thu, Mar 26, 2015 at 7:27 PM, , Roy  wrote:

> We have Spark on YARN, with Cloudera Manager 5.3.2 and CDH 5.3.2
>
> Jobs link on spark History server  doesn't open and shows following
> message :
>
> HTTP ERROR: 500
>
> Problem accessing /history/application_1425934191900_87572. Reason:
>
> Server Error
>
> --
> *Powered by Jetty://*
>
>


FakeClassTag in Java API

2015-03-26 Thread kmader
The JavaAPI uses FakeClassTag for all of the implicit class tags fed to RDDs
during creation, mapping, etc. I am working on a more generic Scala library
where I won't always have the type information beforehand. Is it safe /
accepted practice to use FakeClassTag in these situations as well? It was my
understanding that Scala requires the ClassTag in order to build new Arrays,
what happens when they are not present?
Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/FakeClassTag-in-Java-API-tp22253.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: foreachRDD execution

2015-03-26 Thread Tathagata Das
Yes, that is the correct understanding. There are undocumented parameters
that allow that, but I do not recommend using those :)

TD

On Wed, Mar 25, 2015 at 6:57 AM, Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com> wrote:

> I have a simple and probably dumb question about foreachRDD.
>
> We are using spark streaming + cassandra to compute concurrent users every
> 5min. Our batch size is 10secs and our block interval is 2.5secs.
>
> At the end of the world we are using foreachRDD to join the data in the
> RDD with existing data in Cassandra, update the counters and then save it
> back to Cassandra.
>
> To the best of my understanding, in this scenario, spark streaming
> produces one RDD every 10secs and foreachRDD executes them sequentially,
> that is, foreachRDD would never run in parallel.
>
> Am I right?
>
> Regards,
>
> Luis
>
>
>


Re: Why k-means cluster hang for a long time?

2015-03-26 Thread Xi Shen
How do I get the number of cores that I specified at the command line? I
want to use "spark.default.parallelism". I have 4 executors, each has 8
cores. According to
https://spark.apache.org/docs/1.2.0/configuration.html#execution-behavior,
the "spark.default.parallelism" value will be 4 * 8 = 32...I think it is
too large, or inappropriate. Please give some suggestion.

I have already used cache, and count to pre-cache.

I can try with smaller k for testing, but eventually I will have to use k =
5000 or even large. Because I estimate our data set would have that much of
clusters.


Thanks,
David


On Fri, Mar 27, 2015 at 10:40 AM Burak Yavuz  wrote:

> Hi David,
> The number of centroids (k=5000) seems too large and is probably the cause
> of the code taking too long.
>
> Can you please try the following:
> 1) Repartition data to the number of available cores with
> .repartition(numCores)
> 2) cache data
> 3) call .count() on data right before k-means
> 4) try k=500 (even less if possible)
>
> Thanks,
> Burak
>
> On Mar 26, 2015 4:15 PM, "Xi Shen"  wrote:
> >
> > The code is very simple.
> >
> > val data = sc.textFile("very/large/text/file") map { l =>
> >   // turn each line into dense vector
> >   Vectors.dense(...)
> > }
> >
> > // the resulting data set is about 40k vectors
> >
> > KMeans.train(data, k=5000, maxIterations=500)
> >
> > I just kill my application. In the log I found this:
> >
> > 15/03/26 11:42:43 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_26_piece0
> > 15/03/26 23:02:57 WARN server.TransportChannelHandler: Exception in
> connection from
> workernode0.xshe3539-hadoop-sydney.q10.internal.cloudapp.net/100.72.84.107:56277
> > java.io.IOException: An existing connection was forcibly closed by the
> remote host
> >
> > Notice the time gap. I think it means the work node did not generate any
> log at all for about 12hrs...does it mean they are not working at all?
> >
> > But when testing with very small data set, my application works and
> output expected data.
> >
> >
> > Thanks,
> > David
> >
> >
> > On Fri, Mar 27, 2015 at 10:04 AM Burak Yavuz  wrote:
> >>
> >> Can you share the code snippet of how you call k-means? Do you cache
> the data before k-means? Did you repartition the data?
> >>
> >> On Mar 26, 2015 4:02 PM, "Xi Shen"  wrote:
> >>>
> >>> OH, the job I talked about has ran more than 11 hrs without a
> result...it doesn't make sense.
> >>>
> >>>
> >>> On Fri, Mar 27, 2015 at 9:48 AM Xi Shen  wrote:
> 
>  Hi Burak,
> 
>  My iterations is set to 500. But I think it should also stop of the
> centroid coverages, right?
> 
>  My spark is 1.2.0, working in windows 64 bit. My data set is about
> 40k vectors, each vector has about 300 features, all normalised. All work
> node have sufficient memory and disk space.
> 
>  Thanks,
>  David
> 
> 
>  On Fri, 27 Mar 2015 02:48 Burak Yavuz  wrote:
> >
> > Hi David,
> >
> > When the number of runs are large and the data is not properly
> partitioned, it seems that K-Means is hanging according to my experience.
> Especially setting the number of runs to something high drastically
> increases the work in executors. If that's not the case, can you give more
> info on what Spark version you are using, your setup, and your dataset?
> >
> > Thanks,
> > Burak
> >
> > On Mar 26, 2015 5:10 AM, "Xi Shen"  wrote:
> >>
> >> Hi,
> >>
> >> When I run k-means cluster with Spark, I got this in the last two
> lines in the log:
> >>
> >> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned broadcast 26
> >> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned shuffle 5
> >>
> >>
> >>
> >> Then it hangs for a long time. There's no active job. The driver
> machine is idle. I cannot access the work node, I am not sure if they are
> busy.
> >>
> >> I understand k-means may take a long time to finish. But why no
> active job? no log?
> >>
> >>
> >> Thanks,
> >> David
> >>
>


Re: Can't access file in spark, but can in hadoop

2015-03-26 Thread Ted Yu
Looks like the following assertion failed:
  Preconditions.checkState(storageIDsCount == locs.size());

locs is List
Can you enhance the assertion to log more information ?

Cheers

On Thu, Mar 26, 2015 at 3:06 PM, Dale Johnson  wrote:

> There seems to be a special kind of "corrupted according to Spark" state of
> file in HDFS.  I have isolated a set of files (maybe 1% of all files I need
> to work with) which are producing the following stack dump when I try to
> sc.textFile() open them.  When I try to open directories, most large
> directories contain at least one file of this type.  Curiously, the
> following two lines fail inside of a Spark job, but not inside of a Scoobi
> job:
>
> val conf = new org.apache.hadoop.conf.Configuration
> val fs = org.apache.hadoop.fs.FileSystem.get(conf)
>
> The stack trace follows:
>
> 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception: null)
> Exception in thread "Driver" java.lang.IllegalStateException
> at
>
> org.spark-project.guava.common.base.Preconditions.checkState(Preconditions.java:133)
> at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:673)
> at
>
> org.apache.hadoop.hdfs.protocolPB.PBHelper.convertLocatedBlock(PBHelper.java:1100)
> at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1118)
> at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1251)
> at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1354)
> at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1363)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:518)
> at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$15.(DistributedFileSystem.java:738)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:727)
> at
> org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1662)
> at org.apache.hadoop.fs.FileSystem$5.(FileSystem.java:1724)
> at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1721)
> at
>
> com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1125)
> at
>
> com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1123)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
>
> com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$.main(SpellQuery.scala:1123)
> at
> com.ebay.ss.niffler.miner.speller.SpellQueryLaunch.main(SpellQuery.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
> 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Invoking sc stop from
> shutdown hook
>
> It appears to have found the three copies of the given HDFS block, but is
> performing some sort of validation with them before giving them back to
> spark to schedule the job.  But there is an assert failing.
>
> I've tried this with 1.2.0, 1.2.1 and 1.3.0, and I get the exact same
> error,
> but I've seen the line numbers change on the HDFS libraries, but not the
> function names.  I've tried recompiling myself with different hadoop
> versions, and it's the same.  We're running hadoop 2.4.1 on our cluster.
>
> A google search turns up absolutely nothing on this.
>
> Any insight at all would be appreciated.
>
> Dale Johnson
> Applied Researcher
> eBay.com
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-file-in-spark-but-can-in-hadoop-tp22251.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Date and decimal datatype not working

2015-03-26 Thread BASAK, ANANDA
Thanks all. I am installing Spark 1.3 now. Thought that I should better sync 
with the daily evolution of this new technology.
So once I install that, I will try to use the Spark-CSV library.

Regards
Ananda

From: Dean Wampler [mailto:deanwamp...@gmail.com]
Sent: Wednesday, March 25, 2015 1:17 PM
To: BASAK, ANANDA
Cc: Yin Huai; user@spark.apache.org
Subject: Re: Date and decimal datatype not working

Recall that the input isn't actually read until to do something that forces 
evaluation, like call saveAsTextFile. You didn't show the whole stack trace 
here, but it probably occurred while parsing an input line where one of your 
long fields is actually an empty string.

Because this is such a common problem, I usually define a "parse" method that 
converts input text to the desired schema. It catches parse exceptions like 
this and reports the bad line at least. If you can return a default long in 
this case, say 0, that makes it easier to return something.

dean



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd 
Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Wed, Mar 25, 2015 at 11:48 AM, BASAK, ANANDA 
mailto:ab9...@att.com>> wrote:
Thanks. This library is only available with Spark 1.3. I am using version 
1.2.1. Before I upgrade to 1.3, I want to try what can be done in 1.2.1.

So I am using following:
val MyDataset = sqlContext.sql("my select query”)

MyDataset.map(t => 
t(0)+"|"+t(1)+"|"+t(2)+"|"+t(3)+"|"+t(4)+"|"+t(5)).saveAsTextFile("/my_destination_path")

But it is giving following error:
15/03/24 17:05:51 ERROR Executor: Exception in task 1.0 in stage 13.0 (TID 106)
java.lang.NumberFormatException: For input string: ""
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:453)
at java.lang.Long.parseLong(Long.java:483)
at 
scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230)

is there something wrong with the TSTAMP field which is Long datatype?

Thanks & Regards
---
Ananda Basak

From: Yin Huai [mailto:yh...@databricks.com]
Sent: Monday, March 23, 2015 8:55 PM

To: BASAK, ANANDA
Cc: user@spark.apache.org
Subject: Re: Date and decimal datatype not working

To store to csv file, you can use 
Spark-CSV library.

On Mon, Mar 23, 2015 at 5:35 PM, BASAK, ANANDA 
mailto:ab9...@att.com>> wrote:
Thanks. This worked well as per your suggestions. I had to run following:
val TABLE_A = 
sc.textFile("/Myhome/SPARK/files/table_a_file.txt").map(_.split("|")).map(p => 
ROW_A(p(0).trim.toLong, p(1), p(2).trim.toInt, p(3), BigDecimal(p(4)), 
BigDecimal(p(5)), BigDecimal(p(6

Now I am stuck at another step. I have run a SQL query, where I am Selecting 
from all the fields with some where clause , TSTAMP filtered with date range 
and order by TSTAMP clause. That is running fine.

Then I am trying to store the output in a CSV file. I am using 
saveAsTextFile(“filename”) function. But it is giving error. Can you please 
help me to write a proper syntax to store output in a CSV file?


Thanks & Regards
---
Ananda Basak

From: BASAK, ANANDA
Sent: Tuesday, March 17, 2015 3:08 PM
To: Yin Huai
Cc: user@spark.apache.org
Subject: RE: Date and decimal datatype not working

Ok, thanks for the suggestions. Let me try and will confirm all.

Regards
Ananda

From: Yin Huai [mailto:yh...@databricks.com]
Sent: Tuesday, March 17, 2015 3:04 PM
To: BASAK, ANANDA
Cc: user@spark.apache.org
Subject: Re: Date and decimal datatype not working

p(0) is a String. So, you need to explicitly convert it to a Long. e.g. 
p(0).trim.toLong. You also need to do it for p(2). For those BigDecimals value, 
you need to create BigDecimal objects from your String values.

On Tue, Mar 17, 2015 at 5:55 PM, BASAK, ANANDA 
mailto:ab9...@att.com>> wrote:
Hi All,
I am very new in Spark world. Just started some test coding from last week. I 
am using spark-1.2.1-bin-hadoop2.4 and scala coding.
I am having issues while using Date and decimal data types. Following is my 
code that I am simply running on scala prompt. I am trying to define a table 
and point that to my flat file containing raw data (pipe delimited format). 
Once that is done, I will run some SQL queries and put the output data in to 
another flat file with pipe delimited format.

***
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD


// Define row and table
case class ROW_A(
  TSTAMP:   Long,
  USIDAN: String,
  SECNT:Int,
  SECT:   String,
  BLOCK_NUM:BigDecimal,
  BLO

Re: Spark History Server : jobs link doesn't open

2015-03-26 Thread Marcelo Vanzin
bcc: user@, cc: cdh-user@

I recommend using CDH's mailing list whenever you have a problem with CDH.

That being said, you haven't provided enough info to debug the
problem. Since you're using CM, you can easily go look at the History
Server's logs and see what the underlying error is.


On Thu, Mar 26, 2015 at 4:27 PM, , Roy  wrote:
> We have Spark on YARN, with Cloudera Manager 5.3.2 and CDH 5.3.2
>
> Jobs link on spark History server  doesn't open and shows following message
> :
>
> HTTP ERROR: 500
>
> Problem accessing /history/application_1425934191900_87572. Reason:
>
> Server Error
>
> 
> Powered by Jetty://
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building spark 1.2 from source requires more dependencies

2015-03-26 Thread Xi Shen
It it bought in by another dependency, so you do not need to specify it
explicitly...I think this is what Ted mean.

On Fri, Mar 27, 2015 at 9:48 AM Pala M Muthaia 
wrote:

> +spark-dev
>
> Yes, the dependencies are there. I guess my question is how come the build
> is succeeding in the mainline then, without adding these dependencies?
>
> On Thu, Mar 26, 2015 at 3:44 PM, Ted Yu  wrote:
>
>> Looking at output from dependency:tree, servlet-api is brought in by the
>> following:
>>
>> [INFO] +- org.apache.cassandra:cassandra-all:jar:1.2.6:compile
>> [INFO] |  +- org.antlr:antlr:jar:3.2:compile
>> [INFO] |  +- com.googlecode.json-simple:json-simple:jar:1.1:compile
>> [INFO] |  +- org.yaml:snakeyaml:jar:1.6:compile
>> [INFO] |  +- edu.stanford.ppl:snaptree:jar:0.1:compile
>> [INFO] |  +- org.mindrot:jbcrypt:jar:0.3m:compile
>> [INFO] |  +- org.apache.thrift:libthrift:jar:0.7.0:compile
>> [INFO] |  |  \- javax.servlet:servlet-api:jar:2.5:compile
>>
>> FYI
>>
>> On Thu, Mar 26, 2015 at 3:36 PM, Pala M Muthaia <
>> mchett...@rocketfuelinc.com> wrote:
>>
>>> Hi,
>>>
>>> We are trying to build spark 1.2 from source (tip of the branch-1.2 at
>>> the moment). I tried to build spark using the following command:
>>>
>>> mvn -U -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
>>> -Phive-thriftserver -DskipTests clean package
>>>
>>> I encountered various missing class definition exceptions (e.g: class
>>> javax.servlet.ServletException not found).
>>>
>>> I eventually got the build to succeed after adding the following set of
>>> dependencies to the spark-core's pom.xml:
>>>
>>> 
>>>   javax.servlet
>>>   *servlet-api*
>>>   3.0
>>> 
>>>
>>> 
>>>   org.eclipse.jetty
>>>   *jetty-io*
>>> 
>>>
>>> 
>>>   org.eclipse.jetty
>>>   *jetty-http*
>>> 
>>>
>>> 
>>>   org.eclipse.jetty
>>>   *jetty-servlet*
>>> 
>>>
>>> Pretty much all of the missing class definition errors came up while
>>> building HttpServer.scala, and went away after the above dependencies were
>>> included.
>>>
>>> My guess is official build for spark 1.2 is working already. My question
>>> is what is wrong with my environment or setup, that requires me to add
>>> dependencies to pom.xml in this manner, to get this build to succeed.
>>>
>>> Also, i am not sure if this build would work at runtime for us, i am
>>> still testing this out.
>>>
>>>
>>> Thanks,
>>> pala
>>>
>>
>>
>


Spark History Server : jobs link doesn't open

2015-03-26 Thread , Roy
We have Spark on YARN, with Cloudera Manager 5.3.2 and CDH 5.3.2

Jobs link on spark History server  doesn't open and shows following message
:

HTTP ERROR: 500

Problem accessing /history/application_1425934191900_87572. Reason:

Server Error

--
*Powered by Jetty://*


Re: Why k-means cluster hang for a long time?

2015-03-26 Thread Xi Shen
The code is very simple.

val data = sc.textFile("very/large/text/file") map { l =>
  // turn each line into dense vector
  Vectors.dense(...)
}

// the resulting data set is about 40k vectors

KMeans.train(data, k=5000, maxIterations=500)

I just kill my application. In the log I found this:

15/03/26 *11:42:43* INFO storage.BlockManagerMaster: Updated info of block
broadcast_26_piece0
15/03/26 *23:02:57* WARN server.TransportChannelHandler: Exception in
connection from
workernode0.xshe3539-hadoop-sydney.q10.internal.cloudapp.net/100.72.84.107:56277
java.io.IOException: An existing connection was forcibly closed by the
remote host

Notice the time gap. I think it means the work node did not generate any
log at all for about 12hrs...does it mean they are not working at all?

But when testing with very small data set, my application works and output
expected data.


Thanks,
David


On Fri, Mar 27, 2015 at 10:04 AM Burak Yavuz  wrote:

> Can you share the code snippet of how you call k-means? Do you cache the
> data before k-means? Did you repartition the data?
> On Mar 26, 2015 4:02 PM, "Xi Shen"  wrote:
>
>> OH, the job I talked about has ran more than 11 hrs without a result...it
>> doesn't make sense.
>>
>>
>> On Fri, Mar 27, 2015 at 9:48 AM Xi Shen  wrote:
>>
>>> Hi Burak,
>>>
>>> My iterations is set to 500. But I think it should also stop of the
>>> centroid coverages, right?
>>>
>>> My spark is 1.2.0, working in windows 64 bit. My data set is about 40k
>>> vectors, each vector has about 300 features, all normalised. All work node
>>> have sufficient memory and disk space.
>>>
>>> Thanks,
>>> David
>>>
>>> On Fri, 27 Mar 2015 02:48 Burak Yavuz  wrote:
>>>
 Hi David,

 When the number of runs are large and the data is not properly
 partitioned, it seems that K-Means is hanging according to my experience.
 Especially setting the number of runs to something high drastically
 increases the work in executors. If that's not the case, can you give more
 info on what Spark version you are using, your setup, and your dataset?

 Thanks,
 Burak
 On Mar 26, 2015 5:10 AM, "Xi Shen"  wrote:

> Hi,
>
> When I run k-means cluster with Spark, I got this in the last two
> lines in the log:
>
> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned broadcast 26
> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned shuffle 5
>
>
>
> Then it hangs for a long time. There's no active job. The driver
> machine is idle. I cannot access the work node, I am not sure if they are
> busy.
>
> I understand k-means may take a long time to finish. But why no active
> job? no log?
>
>
> Thanks,
> David
>
>


Re: Why k-means cluster hang for a long time?

2015-03-26 Thread Xi Shen
OH, the job I talked about has ran more than 11 hrs without a result...it
doesn't make sense.


On Fri, Mar 27, 2015 at 9:48 AM Xi Shen  wrote:

> Hi Burak,
>
> My iterations is set to 500. But I think it should also stop of the
> centroid coverages, right?
>
> My spark is 1.2.0, working in windows 64 bit. My data set is about 40k
> vectors, each vector has about 300 features, all normalised. All work node
> have sufficient memory and disk space.
>
> Thanks,
> David
>
> On Fri, 27 Mar 2015 02:48 Burak Yavuz  wrote:
>
>> Hi David,
>>
>> When the number of runs are large and the data is not properly
>> partitioned, it seems that K-Means is hanging according to my experience.
>> Especially setting the number of runs to something high drastically
>> increases the work in executors. If that's not the case, can you give more
>> info on what Spark version you are using, your setup, and your dataset?
>>
>> Thanks,
>> Burak
>> On Mar 26, 2015 5:10 AM, "Xi Shen"  wrote:
>>
>>> Hi,
>>>
>>> When I run k-means cluster with Spark, I got this in the last two lines
>>> in the log:
>>>
>>> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned broadcast 26
>>> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned shuffle 5
>>>
>>>
>>>
>>> Then it hangs for a long time. There's no active job. The driver machine
>>> is idle. I cannot access the work node, I am not sure if they are busy.
>>>
>>> I understand k-means may take a long time to finish. But why no active
>>> job? no log?
>>>
>>>
>>> Thanks,
>>> David
>>>
>>>


Recreating the Mesos/Spark paper's experiments

2015-03-26 Thread hbogert
Hi all, 

For my master thesis I will be characterising performance of two-level
schedulers like Mesos and after reading the paper:
 https://www.cs.berkeley.edu/~alig/papers/mesos.pdf
where Spark is also introduced I am wondering how some experiments and
results came about. 
If this is not the place to ask these questions, or someone knows better
places, please let me know.

I am wondering if the experiment could show the same results if we would use
the current release of Spark, because in
the macro-benchmarks (Fig. 5c), we can see 4 instances (though the text
talks of of 5 instances) of Spark applications being run. 
During 1 instance Sparks seems to elastically grow especially between
[0,200] and [900,1100].

Already this would be problematic to recreate in current Spark on Mesos,
because once an application context starts, it 1) allocates 
all available nodes in the cluster and does not scale up or down during that
application’s lifetime in CoarseGrained mode — or 2) it 
allocates all memory, and does not release it, though it scales up and down
with regard to CPUs in FineGrained mode. 

Even in FineGrained mode it would not work well if there are other
frameworks who need a lot of memory, because they simply wouldn’t be able
to allocate it, because even during idle times of a spark application, the
cluster’s memory is taken. Of course we could limit the memory usage, but
this defeats the purpose of having Mesos.

Does someone know, 
1) Was there a memory limit for Spark during the experiments in the paper
(and thus was the nowadays FineGrained mode chosen), so that other
frameworks would also be able to run?
 or 
2) Was the Spark architecture vastly different back then?

Any other remarks, even anecdotal, are very welcome

Hans



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Recreating-the-Mesos-Spark-paper-s-experiments-tp22252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread David Holiday
will do! I've got to clear with my boss what I can post and in what manner, but 
I'll definitely do what I can to put some working code out into the world so 
the next person who runs into this brick wall can benefit from all this :-D

DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com


[cid:AE39C43E-3FF7-4C90-BCE4-9711C84C4CB8@cld.annailabs.com]
www.AnnaiSystems.com

On Mar 26, 2015, at 3:03 PM, andy petrella 
mailto:andy.petre...@gmail.com>> wrote:


That purely awesome! Don't hesitate to contribute your notebook back to the 
spark notebook repo, even rough, I'll help cleaning up if needed.

The vagrant is also appealing 😆

Congrats!

Le jeu 26 mars 2015 22:22, David Holiday 
mailto:dav...@annaisystems.com>> a écrit :
w0t! that did it! t/y so much!

I'm going to put together a pastebin or something that has all the code put 
together so if anyone else runs into this issue they will have some working 
code to help them figure out what's going on.

DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com



www.AnnaiSystems.com

On Mar 26, 2015, at 12:24 PM, Corey Nolet 
mailto:cjno...@gmail.com>> wrote:

Spark uses a SerializableWritable [1] to java serialize writable objects. I've 
noticed (at least in Spark 1.2.1) that it breaks down with some objects when 
Kryo is used instead of regular java serialization. Though it is  wrapping the 
actual AccumuloInputFormat (another example of something you may want to do in 
the future), we have Accumulo working to load data from a table into Spark SQL 
[2]. The way Spark uses the InputFormat is very straightforward.

[1] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala
[2] 
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76

On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath 
mailto:nick.pentre...@gmail.com>> wrote:
I'm guessing the Accumulo Key and Value classes are not serializable, so you 
would need to do something like

val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) => 
(extractScalaType(key), extractScalaType(value)) }

Where 'extractScalaType converts the key or Value to a standard Scala type or 
case class or whatever - basically extracts the data from the Key or Value in a 
form usable in Scala

—
Sent from Mailbox



On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks 
mailto:rwe...@newbrightidea.com>> wrote:

Hi, David,

This is the code that I use to create a JavaPairRDD from an Accumulo table:

JavaSparkContext sc = new JavaSparkContext(conf);
Job hadoopJob = Job.getInstance(conf,"TestSparkJob");
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setZooKeeperInstance(job,
conf.get(ZOOKEEPER_INSTANCE_NAME,
conf.get(ZOOKEEPER_HOSTS)
);
AccumuloInputFormat.setConnectorInfo(job,
conf.get(ACCUMULO_AGILE_USERNAME),
new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
);
AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME));
AccumuloInputFormat.setScanAuthorizations(job, auths);
JavaPairRDD values = 
sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, 
Key.class, Value.class);

Key.class and Value.class are from org.apache.accumulo.core.data. I use a 
WholeRowIterator so that the Value is actually an encoded representation of an 
entire logical row; it's a useful convenience if you can be sure that your rows 
always fit in memory.

I haven't tested it since Spark 1.0.1 but I doubt anything important has 
changed.

Regards,
-Russ


On Thu, Mar 26, 2015 at 11:41 AM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:

 progress!

i was able to figure out why the 'input INFO not set' error was occurring. the 
eagle-eyed among you will no doubt see the following code is missing a closing 
'('

AbstractInputFormat.setConnectorInfo(jobConf, "root", new 
PasswordToken("password")

as I'm doing this in spark-notebook, I'd been clicking the execute button and 
moving on because I wasn't seeing an error. what I forgot was that notebook is 
going to do what spark-shell will do when you leave off a closing ')' -- it 
will wait forever for you to add it. so the error was the result of the 
'setConnectorInfo' method never getting executed.

unfortunately, I'm still unable to shove the accumulo table data into an RDD 
that's useable to me. when I execute

rddX.count

I get back

res15: Long = 1

which is the correct response - there are 10,000 rows of data in the table I 
pointed to. however, when I try to grab the first element of data thusly:

rddX.first

I get the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0

Re: Why k-means cluster hang for a long time?

2015-03-26 Thread Xi Shen
Hi Burak,

My iterations is set to 500. But I think it should also stop of the
centroid coverages, right?

My spark is 1.2.0, working in windows 64 bit. My data set is about 40k
vectors, each vector has about 300 features, all normalised. All work node
have sufficient memory and disk space.

Thanks,
David

On Fri, 27 Mar 2015 02:48 Burak Yavuz  wrote:

> Hi David,
>
> When the number of runs are large and the data is not properly
> partitioned, it seems that K-Means is hanging according to my experience.
> Especially setting the number of runs to something high drastically
> increases the work in executors. If that's not the case, can you give more
> info on what Spark version you are using, your setup, and your dataset?
>
> Thanks,
> Burak
> On Mar 26, 2015 5:10 AM, "Xi Shen"  wrote:
>
>> Hi,
>>
>> When I run k-means cluster with Spark, I got this in the last two lines
>> in the log:
>>
>> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned broadcast 26
>> 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned shuffle 5
>>
>>
>>
>> Then it hangs for a long time. There's no active job. The driver machine
>> is idle. I cannot access the work node, I am not sure if they are busy.
>>
>> I understand k-means may take a long time to finish. But why no active
>> job? no log?
>>
>>
>> Thanks,
>> David
>>
>>


Re: Building spark 1.2 from source requires more dependencies

2015-03-26 Thread Pala M Muthaia
+spark-dev

Yes, the dependencies are there. I guess my question is how come the build
is succeeding in the mainline then, without adding these dependencies?

On Thu, Mar 26, 2015 at 3:44 PM, Ted Yu  wrote:

> Looking at output from dependency:tree, servlet-api is brought in by the
> following:
>
> [INFO] +- org.apache.cassandra:cassandra-all:jar:1.2.6:compile
> [INFO] |  +- org.antlr:antlr:jar:3.2:compile
> [INFO] |  +- com.googlecode.json-simple:json-simple:jar:1.1:compile
> [INFO] |  +- org.yaml:snakeyaml:jar:1.6:compile
> [INFO] |  +- edu.stanford.ppl:snaptree:jar:0.1:compile
> [INFO] |  +- org.mindrot:jbcrypt:jar:0.3m:compile
> [INFO] |  +- org.apache.thrift:libthrift:jar:0.7.0:compile
> [INFO] |  |  \- javax.servlet:servlet-api:jar:2.5:compile
>
> FYI
>
> On Thu, Mar 26, 2015 at 3:36 PM, Pala M Muthaia <
> mchett...@rocketfuelinc.com> wrote:
>
>> Hi,
>>
>> We are trying to build spark 1.2 from source (tip of the branch-1.2 at
>> the moment). I tried to build spark using the following command:
>>
>> mvn -U -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
>> -Phive-thriftserver -DskipTests clean package
>>
>> I encountered various missing class definition exceptions (e.g: class
>> javax.servlet.ServletException not found).
>>
>> I eventually got the build to succeed after adding the following set of
>> dependencies to the spark-core's pom.xml:
>>
>> 
>>   javax.servlet
>>   *servlet-api*
>>   3.0
>> 
>>
>> 
>>   org.eclipse.jetty
>>   *jetty-io*
>> 
>>
>> 
>>   org.eclipse.jetty
>>   *jetty-http*
>> 
>>
>> 
>>   org.eclipse.jetty
>>   *jetty-servlet*
>> 
>>
>> Pretty much all of the missing class definition errors came up while
>> building HttpServer.scala, and went away after the above dependencies were
>> included.
>>
>> My guess is official build for spark 1.2 is working already. My question
>> is what is wrong with my environment or setup, that requires me to add
>> dependencies to pom.xml in this manner, to get this build to succeed.
>>
>> Also, i am not sure if this build would work at runtime for us, i am
>> still testing this out.
>>
>>
>> Thanks,
>> pala
>>
>
>


Re: Building spark 1.2 from source requires more dependencies

2015-03-26 Thread Ted Yu
Looking at output from dependency:tree, servlet-api is brought in by the
following:

[INFO] +- org.apache.cassandra:cassandra-all:jar:1.2.6:compile
[INFO] |  +- org.antlr:antlr:jar:3.2:compile
[INFO] |  +- com.googlecode.json-simple:json-simple:jar:1.1:compile
[INFO] |  +- org.yaml:snakeyaml:jar:1.6:compile
[INFO] |  +- edu.stanford.ppl:snaptree:jar:0.1:compile
[INFO] |  +- org.mindrot:jbcrypt:jar:0.3m:compile
[INFO] |  +- org.apache.thrift:libthrift:jar:0.7.0:compile
[INFO] |  |  \- javax.servlet:servlet-api:jar:2.5:compile

FYI

On Thu, Mar 26, 2015 at 3:36 PM, Pala M Muthaia  wrote:

> Hi,
>
> We are trying to build spark 1.2 from source (tip of the branch-1.2 at the
> moment). I tried to build spark using the following command:
>
> mvn -U -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
> -Phive-thriftserver -DskipTests clean package
>
> I encountered various missing class definition exceptions (e.g: class
> javax.servlet.ServletException not found).
>
> I eventually got the build to succeed after adding the following set of
> dependencies to the spark-core's pom.xml:
>
> 
>   javax.servlet
>   *servlet-api*
>   3.0
> 
>
> 
>   org.eclipse.jetty
>   *jetty-io*
> 
>
> 
>   org.eclipse.jetty
>   *jetty-http*
> 
>
> 
>   org.eclipse.jetty
>   *jetty-servlet*
> 
>
> Pretty much all of the missing class definition errors came up while
> building HttpServer.scala, and went away after the above dependencies were
> included.
>
> My guess is official build for spark 1.2 is working already. My question
> is what is wrong with my environment or setup, that requires me to add
> dependencies to pom.xml in this manner, to get this build to succeed.
>
> Also, i am not sure if this build would work at runtime for us, i am still
> testing this out.
>
>
> Thanks,
> pala
>


Re: K Means cluster with spark

2015-03-26 Thread Xi Shen
Hi Sandeep,

I followed the DenseKMeans example which comes with the spark package.

My total vectors are about 40k, and my k=500. All my code are written in
Scala.

Thanks,
David

On Fri, 27 Mar 2015 05:51 sandeep vura  wrote:

> Hi Shen,
>
> I am also working on k means clustering with spark. May i know which links
> you are following to get understand k means clustering with spark and also
> need sample k means program to process in spark. which is written in scala.
>
> Regards,
> Sandeep.v
>


Building spark 1.2 from source requires more dependencies

2015-03-26 Thread Pala M Muthaia
Hi,

We are trying to build spark 1.2 from source (tip of the branch-1.2 at the
moment). I tried to build spark using the following command:

mvn -U -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
-Phive-thriftserver -DskipTests clean package

I encountered various missing class definition exceptions (e.g: class
javax.servlet.ServletException not found).

I eventually got the build to succeed after adding the following set of
dependencies to the spark-core's pom.xml:


  javax.servlet
  *servlet-api*
  3.0



  org.eclipse.jetty
  *jetty-io*



  org.eclipse.jetty
  *jetty-http*



  org.eclipse.jetty
  *jetty-servlet*


Pretty much all of the missing class definition errors came up while
building HttpServer.scala, and went away after the above dependencies were
included.

My guess is official build for spark 1.2 is working already. My question is
what is wrong with my environment or setup, that requires me to add
dependencies to pom.xml in this manner, to get this build to succeed.

Also, i am not sure if this build would work at runtime for us, i am still
testing this out.


Thanks,
pala


Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks all for the quick response.

Thanks.

Zhan Zhang

On Mar 26, 2015, at 3:14 PM, Patrick Wendell  wrote:

> I think we have a version of mapPartitions that allows you to tell
> Spark the partitioning is preserved:
> 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639
> 
> We could also add a map function that does same. Or you can just write
> your map using an iterator.
> 
> - Patrick
> 
> On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney  wrote:
>> This is just a deficiency of the api, imo. I agree: mapValues could
>> definitely be a function (K, V)=>V1. The option isn't set by the function,
>> it's on the RDD. So you could look at the code and do this.
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>> 
>> def mapValues[U](f: V => U): RDD[(K, U)] = {
>>val cleanF = self.context.clean(f)
>>new MapPartitionsRDD[(K, U), (K, V)](self,
>>  (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
>>  preservesPartitioning = true)
>>  }
>> 
>> What you want:
>> 
>> def mapValues[U](f: (K, V) => U): RDD[(K, U)] = {
>>val cleanF = self.context.clean(f)
>>new MapPartitionsRDD[(K, U), (K, V)](self,
>>  (context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) },
>>  preservesPartitioning = true)
>>  }
>> 
>> One of the nice things about spark is that making such new operators is very
>> easy :)
>> 
>> 2015-03-26 17:54 GMT-04:00 Zhan Zhang :
>> 
>>> Thanks Jonathan. You are right regarding rewrite the example.
>>> 
>>> I mean providing such option to developer so that it is controllable. The
>>> example may seems silly, and I don't know the use cases.
>>> 
>>> But for example, if I also want to operate both the key and value part to
>>> generate some new value with keeping key part untouched. Then mapValues may
>>> not be able to  do this.
>>> 
>>> Changing the code to allow this is trivial, but I don't know whether there
>>> is some special reason behind this.
>>> 
>>> Thanks.
>>> 
>>> Zhan Zhang
>>> 
>>> 
>>> 
>>> 
>>> On Mar 26, 2015, at 2:49 PM, Jonathan Coveney  wrote:
>>> 
>>> I believe if you do the following:
>>> 
>>> 
>>> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
>>> 
>>> (8) MapPartitionsRDD[34] at reduceByKey at :23 []
>>> |  MapPartitionsRDD[33] at mapValues at :23 []
>>> |  ShuffledRDD[32] at reduceByKey at :23 []
>>> +-(8) MapPartitionsRDD[31] at map at :23 []
>>>|  ParallelCollectionRDD[30] at parallelize at :23 []
>>> 
>>> The difference is that spark has no way to know that your map closure
>>> doesn't change the key. if you only use mapValues, it does. Pretty cool that
>>> they optimized that :)
>>> 
>>> 2015-03-26 17:44 GMT-04:00 Zhan Zhang :
 
 Hi Folks,
 
 Does anybody know what is the reason not allowing preserverPartitioning
 in RDD.map? Do I miss something here?
 
 Following example involves two shuffles. I think if preservePartitioning
 is allowed, we can avoid the second one, right?
 
 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=>(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)
 
 scala> r5.toDebugString
 res2: String =
 (8) ShuffledRDD[4] at reduceByKey at :29 []
 +-(8) MapPartitionsRDD[3] at map at :27 []
|  ShuffledRDD[2] at reduceByKey at :25 []
+-(8) MapPartitionsRDD[1] at map at :23 []
   |  ParallelCollectionRDD[0] at parallelize at :21 []
 
 Thanks.
 
 Zhan Zhang
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
>>> 
>>> 
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Patrick Wendell
I think we have a version of mapPartitions that allows you to tell
Spark the partitioning is preserved:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639

We could also add a map function that does same. Or you can just write
your map using an iterator.

- Patrick

On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney  wrote:
> This is just a deficiency of the api, imo. I agree: mapValues could
> definitely be a function (K, V)=>V1. The option isn't set by the function,
> it's on the RDD. So you could look at the code and do this.
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>
>  def mapValues[U](f: V => U): RDD[(K, U)] = {
> val cleanF = self.context.clean(f)
> new MapPartitionsRDD[(K, U), (K, V)](self,
>   (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
>   preservesPartitioning = true)
>   }
>
> What you want:
>
>  def mapValues[U](f: (K, V) => U): RDD[(K, U)] = {
> val cleanF = self.context.clean(f)
> new MapPartitionsRDD[(K, U), (K, V)](self,
>   (context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) },
>   preservesPartitioning = true)
>   }
>
> One of the nice things about spark is that making such new operators is very
> easy :)
>
> 2015-03-26 17:54 GMT-04:00 Zhan Zhang :
>
>> Thanks Jonathan. You are right regarding rewrite the example.
>>
>> I mean providing such option to developer so that it is controllable. The
>> example may seems silly, and I don't know the use cases.
>>
>> But for example, if I also want to operate both the key and value part to
>> generate some new value with keeping key part untouched. Then mapValues may
>> not be able to  do this.
>>
>> Changing the code to allow this is trivial, but I don't know whether there
>> is some special reason behind this.
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>>
>>
>>
>> On Mar 26, 2015, at 2:49 PM, Jonathan Coveney  wrote:
>>
>> I believe if you do the following:
>>
>>
>> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
>>
>> (8) MapPartitionsRDD[34] at reduceByKey at :23 []
>>  |  MapPartitionsRDD[33] at mapValues at :23 []
>>  |  ShuffledRDD[32] at reduceByKey at :23 []
>>  +-(8) MapPartitionsRDD[31] at map at :23 []
>> |  ParallelCollectionRDD[30] at parallelize at :23 []
>>
>> The difference is that spark has no way to know that your map closure
>> doesn't change the key. if you only use mapValues, it does. Pretty cool that
>> they optimized that :)
>>
>> 2015-03-26 17:44 GMT-04:00 Zhan Zhang :
>>>
>>> Hi Folks,
>>>
>>> Does anybody know what is the reason not allowing preserverPartitioning
>>> in RDD.map? Do I miss something here?
>>>
>>> Following example involves two shuffles. I think if preservePartitioning
>>> is allowed, we can avoid the second one, right?
>>>
>>>  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>>>  val r2 = r1.map((_, 1))
>>>  val r3 = r2.reduceByKey(_+_)
>>>  val r4 = r3.map(x=>(x._1, x._2 + 1))
>>>  val r5 = r4.reduceByKey(_+_)
>>>  r5.collect.foreach(println)
>>>
>>> scala> r5.toDebugString
>>> res2: String =
>>> (8) ShuffledRDD[4] at reduceByKey at :29 []
>>>  +-(8) MapPartitionsRDD[3] at map at :27 []
>>> |  ShuffledRDD[2] at reduceByKey at :25 []
>>> +-(8) MapPartitionsRDD[1] at map at :23 []
>>>|  ParallelCollectionRDD[0] at parallelize at :21 []
>>>
>>> Thanks.
>>>
>>> Zhan Zhang
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can't access file in spark, but can in hadoop

2015-03-26 Thread Dale Johnson
There seems to be a special kind of "corrupted according to Spark" state of
file in HDFS.  I have isolated a set of files (maybe 1% of all files I need
to work with) which are producing the following stack dump when I try to
sc.textFile() open them.  When I try to open directories, most large
directories contain at least one file of this type.  Curiously, the
following two lines fail inside of a Spark job, but not inside of a Scoobi
job:

val conf = new org.apache.hadoop.conf.Configuration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)

The stack trace follows:

15/03/26 14:22:43 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: null)
Exception in thread "Driver" java.lang.IllegalStateException
at
org.spark-project.guava.common.base.Preconditions.checkState(Preconditions.java:133)
at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:673)
at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convertLocatedBlock(PBHelper.java:1100)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1118)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1251)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1354)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1363)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:518)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743)
at
org.apache.hadoop.hdfs.DistributedFileSystem$15.(DistributedFileSystem.java:738)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:727)
at 
org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1662)
at org.apache.hadoop.fs.FileSystem$5.(FileSystem.java:1724)
at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1721)
at
com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1125)
at
com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1123)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$.main(SpellQuery.scala:1123)
at
com.ebay.ss.niffler.miner.speller.SpellQueryLaunch.main(SpellQuery.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
15/03/26 14:22:43 INFO yarn.ApplicationMaster: Invoking sc stop from
shutdown hook

It appears to have found the three copies of the given HDFS block, but is
performing some sort of validation with them before giving them back to
spark to schedule the job.  But there is an assert failing.

I've tried this with 1.2.0, 1.2.1 and 1.3.0, and I get the exact same error,
but I've seen the line numbers change on the HDFS libraries, but not the
function names.  I've tried recompiling myself with different hadoop
versions, and it's the same.  We're running hadoop 2.4.1 on our cluster.

A google search turns up absolutely nothing on this.

Any insight at all would be appreciated.

Dale Johnson
Applied Researcher
eBay.com




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-file-in-spark-but-can-in-hadoop-tp22251.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Jonathan Coveney
This is just a deficiency of the api, imo. I agree: mapValues could
definitely be a function (K, V)=>V1. The option isn't set by the function,
it's on the RDD. So you could look at the code and do this.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

 def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
  preservesPartitioning = true)
  }

What you want:

 def mapValues[U](f: (K, V) => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) },
  preservesPartitioning = true)
  }

One of the nice things about spark is that making such new operators is
very easy :)

2015-03-26 17:54 GMT-04:00 Zhan Zhang :

>  Thanks Jonathan. You are right regarding rewrite the example.
>
>  I mean providing such option to developer so that it is controllable.
> The example may seems silly, and I don’t know the use cases.
>
> But for example, if I also want to operate both the key and value part to
> generate some new value with keeping key part untouched. Then mapValues may
> not be able to  do this.
>
>  Changing the code to allow this is trivial, but I don’t know whether
> there is some special reason behind this.
>
>  Thanks.
>
>  Zhan Zhang
>
>
>
>
>  On Mar 26, 2015, at 2:49 PM, Jonathan Coveney  wrote:
>
>  I believe if you do the following:
>
>
> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
>
>  (8) MapPartitionsRDD[34] at reduceByKey at :23 []
>  |  MapPartitionsRDD[33] at mapValues at :23 []
>  |  ShuffledRDD[32] at reduceByKey at :23 []
>  +-(8) MapPartitionsRDD[31] at map at :23 []
> |  ParallelCollectionRDD[30] at parallelize at :23 []
>
>  The difference is that spark has no way to know that your map closure
> doesn't change the key. if you only use mapValues, it does. Pretty cool
> that they optimized that :)
>
> 2015-03-26 17:44 GMT-04:00 Zhan Zhang :
>
>> Hi Folks,
>>
>> Does anybody know what is the reason not allowing preserverPartitioning
>> in RDD.map? Do I miss something here?
>>
>> Following example involves two shuffles. I think if preservePartitioning
>> is allowed, we can avoid the second one, right?
>>
>>  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>>  val r2 = r1.map((_, 1))
>>  val r3 = r2.reduceByKey(_+_)
>>  val r4 = r3.map(x=>(x._1, x._2 + 1))
>>  val r5 = r4.reduceByKey(_+_)
>>  r5.collect.foreach(println)
>>
>> scala> r5.toDebugString
>> res2: String =
>> (8) ShuffledRDD[4] at reduceByKey at :29 []
>>  +-(8) MapPartitionsRDD[3] at map at :27 []
>> |  ShuffledRDD[2] at reduceByKey at :25 []
>> +-(8) MapPartitionsRDD[1] at map at :23 []
>>|  ParallelCollectionRDD[0] at parallelize at :21 []
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread andy petrella
That purely awesome! Don't hesitate to contribute your notebook back to the
spark notebook repo, even rough, I'll help cleaning up if needed.

The vagrant is also appealing 😆

Congrats!

Le jeu 26 mars 2015 22:22, David Holiday  a écrit :

>  w0t! that did it! t/y so much!
>
>  I'm going to put together a pastebin or something that has all the code
> put together so if anyone else runs into this issue they will have some
> working code to help them figure out what's going on.
>
> DAVID HOLIDAY
>  Software Engineer
>  760 607 3300 | Office
>  312 758 8385 | Mobile
>  dav...@annaisystems.com 
>
>
>
> www.AnnaiSystems.com
>
>  On Mar 26, 2015, at 12:24 PM, Corey Nolet  wrote:
>
>  Spark uses a SerializableWritable [1] to java serialize writable
> objects. I've noticed (at least in Spark 1.2.1) that it breaks down with
> some objects when Kryo is used instead of regular java serialization.
> Though it is  wrapping the actual AccumuloInputFormat (another example of
> something you may want to do in the future), we have Accumulo working to
> load data from a table into Spark SQL [2]. The way Spark uses the
> InputFormat is very straightforward.
>
>  [1]
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala
> [2]
> https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76
>
> On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath 
> wrote:
>
>> I'm guessing the Accumulo Key and Value classes are not serializable, so
>> you would need to do something like
>>
>>  val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) =>
>> (extractScalaType(key), extractScalaType(value)) }
>>
>>  Where 'extractScalaType converts the key or Value to a standard Scala
>> type or case class or whatever - basically extracts the data from the Key
>> or Value in a form usable in Scala
>>
>> —
>> Sent from Mailbox 
>>
>>
>>   On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks 
>> wrote:
>>
>>>   Hi, David,
>>>
>>>  This is the code that I use to create a JavaPairRDD from an Accumulo
>>> table:
>>>
>>>  JavaSparkContext sc = new JavaSparkContext(conf);
>>> Job hadoopJob = Job.getInstance(conf,"TestSparkJob");
>>> job.setInputFormatClass(AccumuloInputFormat.class);
>>> AccumuloInputFormat.setZooKeeperInstance(job,
>>> conf.get(ZOOKEEPER_INSTANCE_NAME,
>>> conf.get(ZOOKEEPER_HOSTS)
>>> );
>>> AccumuloInputFormat.setConnectorInfo(job,
>>> conf.get(ACCUMULO_AGILE_USERNAME),
>>> new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
>>> );
>>> AccumuloInputFormat.setInputTableName(job,
>>> conf.get(ACCUMULO_TABLE_NAME));
>>> AccumuloInputFormat.setScanAuthorizations(job, auths);
>>> JavaPairRDD values =
>>> sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class,
>>> Key.class, Value.class);
>>>
>>>  Key.class and Value.class are from org.apache.accumulo.core.data. I
>>> use a WholeRowIterator so that the Value is actually an encoded
>>> representation of an entire logical row; it's a useful convenience if you
>>> can be sure that your rows always fit in memory.
>>>
>>>  I haven't tested it since Spark 1.0.1 but I doubt anything important
>>> has changed.
>>>
>>>  Regards,
>>> -Russ
>>>
>>>
>>>  On Thu, Mar 26, 2015 at 11:41 AM, David Holiday <
>>> dav...@annaisystems.com> wrote:
>>>
   * progress!*

 i was able to figure out why the 'input INFO not set' error was
 occurring. the eagle-eyed among you will no doubt see the following code is
 missing a closing '('

 AbstractInputFormat.setConnectorInfo(jobConf, "root", new 
 PasswordToken("password")

 as I'm doing this in spark-notebook, I'd been clicking the execute
 button and moving on because I wasn't seeing an error. what I forgot was
 that notebook is going to do what spark-shell will do when you leave off a
 closing ')' -- *it will wait forever for you to add it*. so the error
 was the result of the 'setConnectorInfo' method never getting executed.

 unfortunately, I'm still unable to shove the accumulo table data into
 an RDD that's useable to me. when I execute

 rddX.count

 I get back

 res15: Long = 1

 which is the correct response - there are 10,000 rows of data in the
 table I pointed to. however, when I try to grab the first element of data
 thusly:

 rddX.first

 I get the following error:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0 in stage 0.0 (TID 0) had a not serializable result:
 org.apache.accumulo.core.data.Key

 any thoughts on where to go from here?
 DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com 


  

 www.AnnaiSystems.com 
>>>

Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks Jonathan. You are right regarding rewrite the example.

I mean providing such option to developer so that it is controllable. The 
example may seems silly, and I don’t know the use cases.

But for example, if I also want to operate both the key and value part to 
generate some new value with keeping key part untouched. Then mapValues may not 
be able to  do this.

Changing the code to allow this is trivial, but I don’t know whether there is 
some special reason behind this.

Thanks.

Zhan Zhang



On Mar 26, 2015, at 2:49 PM, Jonathan Coveney 
mailto:jcove...@gmail.com>> wrote:

I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at :23 []
 |  MapPartitionsRDD[33] at mapValues at :23 []
 |  ShuffledRDD[32] at reduceByKey at :23 []
 +-(8) MapPartitionsRDD[31] at map at :23 []
|  ParallelCollectionRDD[30] at parallelize at :23 []

The difference is that spark has no way to know that your map closure doesn't 
change the key. if you only use mapValues, it does. Pretty cool that they 
optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang 
mailto:zzh...@hortonworks.com>>:
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in 
RDD.map? Do I miss something here?

Following example involves two shuffles. I think if preservePartitioning is 
allowed, we can avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=>(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala> r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at :29 []
 +-(8) MapPartitionsRDD[3] at map at :27 []
|  ShuffledRDD[2] at reduceByKey at :25 []
+-(8) MapPartitionsRDD[1] at map at :23 []
   |  ParallelCollectionRDD[0] at parallelize at :21 []

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org





Re: Combining Many RDDs

2015-03-26 Thread Kelvin Chu
Hi, I used union() before and yes it may be slow sometimes. I _guess_ your
variable 'data' is a Scala collection and compute() returns an RDD. Right?
If yes, I tried the approach below to operate on one RDD only during the
whole computation (Yes, I also saw that too many RDD hurt performance).

Change compute() to return Scala collection instead of RDD.

val result = sc.parallelize(data)// Create and partition the
0.5M items in a single RDD.
  .flatMap(compute(_))   // You still have only one RDD with each item
joined with external data already

Hope this help.

Kelvin

On Thu, Mar 26, 2015 at 2:37 PM, Yang Chen  wrote:

> Hi Mark,
>
> That's true, but in neither way can I combine the RDDs, so I have to avoid
> unions.
>
> Thanks,
> Yang
>
> On Thu, Mar 26, 2015 at 5:31 PM, Mark Hamstra 
> wrote:
>
>> RDD#union is not the same thing as SparkContext#union
>>
>> On Thu, Mar 26, 2015 at 2:27 PM, Yang Chen  wrote:
>>
>>> Hi Noorul,
>>>
>>> Thank you for your suggestion. I tried that, but ran out of memory. I
>>> did some search and found some suggestions
>>> that we should try to avoid rdd.union(
>>> http://stackoverflow.com/questions/28343181/memory-efficient-way-of-union-a-sequence-of-rdds-from-files-in-apache-spark
>>> ).
>>> I will try to come up with some other ways.
>>>
>>> Thank you,
>>> Yang
>>>
>>> On Thu, Mar 26, 2015 at 1:13 PM, Noorul Islam K M 
>>> wrote:
>>>
 sparkx  writes:

 > Hi,
 >
 > I have a Spark job and a dataset of 0.5 Million items. Each item
 performs
 > some sort of computation (joining a shared external dataset, if that
 does
 > matter) and produces an RDD containing 20-500 result items. Now I
 would like
 > to combine all these RDDs and perform a next job. What I have found
 out is
 > that the computation itself is quite fast, but combining these RDDs
 takes
 > much longer time.
 >
 > val result = data// 0.5M data items
 >   .map(compute(_))   // Produces an RDD - fast
 >   .reduce(_ ++ _)  // Combining RDDs - slow
 >
 > I have also tried to collect results from compute(_) and use a
 flatMap, but
 > that is also slow.
 >
 > Is there a way to efficiently do this? I'm thinking about writing this
 > result to HDFS and reading from disk for the next job, but am not
 sure if
 > that's a preferred way in Spark.
 >

 Are you looking for SparkContext.union() [1] ?

 This is not performing well with spark cassandra connector. I am not
 sure whether this will help you.

 Thanks and Regards
 Noorul

 [1]
 http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext

>>>
>>>
>>>
>>> --
>>> Yang Chen
>>> Dept. of CISE, University of Florida
>>> Mail: y...@yang-cs.com
>>> Web: www.cise.ufl.edu/~yang
>>>
>>
>>
>
>
> --
> Yang Chen
> Dept. of CISE, University of Florida
> Mail: y...@yang-cs.com
> Web: www.cise.ufl.edu/~yang
>


Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Jonathan Coveney
I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at :23 []
 |  MapPartitionsRDD[33] at mapValues at :23 []
 |  ShuffledRDD[32] at reduceByKey at :23 []
 +-(8) MapPartitionsRDD[31] at map at :23 []
|  ParallelCollectionRDD[30] at parallelize at :23 []

The difference is that spark has no way to know that your map closure
doesn't change the key. if you only use mapValues, it does. Pretty cool
that they optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang :

> Hi Folks,
>
> Does anybody know what is the reason not allowing preserverPartitioning in
> RDD.map? Do I miss something here?
>
> Following example involves two shuffles. I think if preservePartitioning
> is allowed, we can avoid the second one, right?
>
>  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>  val r2 = r1.map((_, 1))
>  val r3 = r2.reduceByKey(_+_)
>  val r4 = r3.map(x=>(x._1, x._2 + 1))
>  val r5 = r4.reduceByKey(_+_)
>  r5.collect.foreach(println)
>
> scala> r5.toDebugString
> res2: String =
> (8) ShuffledRDD[4] at reduceByKey at :29 []
>  +-(8) MapPartitionsRDD[3] at map at :27 []
> |  ShuffledRDD[2] at reduceByKey at :25 []
> +-(8) MapPartitionsRDD[1] at map at :23 []
>|  ParallelCollectionRDD[0] at parallelize at :21 []
>
> Thanks.
>
> Zhan Zhang
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in 
RDD.map? Do I miss something here?

Following example involves two shuffles. I think if preservePartitioning is 
allowed, we can avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=>(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala> r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at :29 []
 +-(8) MapPartitionsRDD[3] at map at :27 []
|  ShuffledRDD[2] at reduceByKey at :25 []
+-(8) MapPartitionsRDD[1] at map at :23 []
   |  ParallelCollectionRDD[0] at parallelize at :21 []

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



WordCount example

2015-03-26 Thread Mohit Anchlia
I am trying to run the word count example but for some reason it's not
working as expected. I start "nc" server on port  and then submit the
spark job to the cluster. Spark job gets successfully submitting but I
never see any connection from spark getting established. I also tried to
type words on the console where "nc" is listening and waiting on the
prompt, however I don't see any output. I also don't see any errors.

Here is the conf:

SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
"NetworkWordCount");

JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
*seconds*(1));

JavaReceiverInputDStream lines = jssc.socketTextStream("localhost",
);


Re: Combining Many RDDs

2015-03-26 Thread Yang Chen
Hi Mark,

That's true, but in neither way can I combine the RDDs, so I have to avoid
unions.

Thanks,
Yang

On Thu, Mar 26, 2015 at 5:31 PM, Mark Hamstra 
wrote:

> RDD#union is not the same thing as SparkContext#union
>
> On Thu, Mar 26, 2015 at 2:27 PM, Yang Chen  wrote:
>
>> Hi Noorul,
>>
>> Thank you for your suggestion. I tried that, but ran out of memory. I did
>> some search and found some suggestions
>> that we should try to avoid rdd.union(
>> http://stackoverflow.com/questions/28343181/memory-efficient-way-of-union-a-sequence-of-rdds-from-files-in-apache-spark
>> ).
>> I will try to come up with some other ways.
>>
>> Thank you,
>> Yang
>>
>> On Thu, Mar 26, 2015 at 1:13 PM, Noorul Islam K M 
>> wrote:
>>
>>> sparkx  writes:
>>>
>>> > Hi,
>>> >
>>> > I have a Spark job and a dataset of 0.5 Million items. Each item
>>> performs
>>> > some sort of computation (joining a shared external dataset, if that
>>> does
>>> > matter) and produces an RDD containing 20-500 result items. Now I
>>> would like
>>> > to combine all these RDDs and perform a next job. What I have found
>>> out is
>>> > that the computation itself is quite fast, but combining these RDDs
>>> takes
>>> > much longer time.
>>> >
>>> > val result = data// 0.5M data items
>>> >   .map(compute(_))   // Produces an RDD - fast
>>> >   .reduce(_ ++ _)  // Combining RDDs - slow
>>> >
>>> > I have also tried to collect results from compute(_) and use a
>>> flatMap, but
>>> > that is also slow.
>>> >
>>> > Is there a way to efficiently do this? I'm thinking about writing this
>>> > result to HDFS and reading from disk for the next job, but am not sure
>>> if
>>> > that's a preferred way in Spark.
>>> >
>>>
>>> Are you looking for SparkContext.union() [1] ?
>>>
>>> This is not performing well with spark cassandra connector. I am not
>>> sure whether this will help you.
>>>
>>> Thanks and Regards
>>> Noorul
>>>
>>> [1]
>>> http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext
>>>
>>
>>
>>
>> --
>> Yang Chen
>> Dept. of CISE, University of Florida
>> Mail: y...@yang-cs.com
>> Web: www.cise.ufl.edu/~yang
>>
>
>


-- 
Yang Chen
Dept. of CISE, University of Florida
Mail: y...@yang-cs.com
Web: www.cise.ufl.edu/~yang


Re: Combining Many RDDs

2015-03-26 Thread Mark Hamstra
RDD#union is not the same thing as SparkContext#union

On Thu, Mar 26, 2015 at 2:27 PM, Yang Chen  wrote:

> Hi Noorul,
>
> Thank you for your suggestion. I tried that, but ran out of memory. I did
> some search and found some suggestions
> that we should try to avoid rdd.union(
> http://stackoverflow.com/questions/28343181/memory-efficient-way-of-union-a-sequence-of-rdds-from-files-in-apache-spark
> ).
> I will try to come up with some other ways.
>
> Thank you,
> Yang
>
> On Thu, Mar 26, 2015 at 1:13 PM, Noorul Islam K M 
> wrote:
>
>> sparkx  writes:
>>
>> > Hi,
>> >
>> > I have a Spark job and a dataset of 0.5 Million items. Each item
>> performs
>> > some sort of computation (joining a shared external dataset, if that
>> does
>> > matter) and produces an RDD containing 20-500 result items. Now I would
>> like
>> > to combine all these RDDs and perform a next job. What I have found out
>> is
>> > that the computation itself is quite fast, but combining these RDDs
>> takes
>> > much longer time.
>> >
>> > val result = data// 0.5M data items
>> >   .map(compute(_))   // Produces an RDD - fast
>> >   .reduce(_ ++ _)  // Combining RDDs - slow
>> >
>> > I have also tried to collect results from compute(_) and use a flatMap,
>> but
>> > that is also slow.
>> >
>> > Is there a way to efficiently do this? I'm thinking about writing this
>> > result to HDFS and reading from disk for the next job, but am not sure
>> if
>> > that's a preferred way in Spark.
>> >
>>
>> Are you looking for SparkContext.union() [1] ?
>>
>> This is not performing well with spark cassandra connector. I am not
>> sure whether this will help you.
>>
>> Thanks and Regards
>> Noorul
>>
>> [1]
>> http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext
>>
>
>
>
> --
> Yang Chen
> Dept. of CISE, University of Florida
> Mail: y...@yang-cs.com
> Web: www.cise.ufl.edu/~yang
>


Re: Fuzzy GroupBy

2015-03-26 Thread Sean Owen
The grouping is determined by the POJO's equals() method. You can also
call groupBy() to group by some function of the POJOs. For example if
you're grouping Doubles into nearly-equal bunches, you could group by
their .intValue()

On Thu, Mar 26, 2015 at 8:47 PM, Mihran Shahinian  wrote:
> I would like to group records, but instead of grouping on exact key I want
> to be able to compute the similarity of keys on my own. Is there a
> recommended way of doing this?
>
> here is my starting point
>
> final JavaRDD< pojo > records = spark.parallelize(getListofPojos()).cache();
>
> class pojo {
>  String prop1
>  String prop2
> }
>
> during groupBy I would like to compute similarity between prop1 for each
> pojo.
>
> Much appreciated,
> Mihran

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Combining Many RDDs

2015-03-26 Thread Yang Chen
Hi Noorul,

Thank you for your suggestion. I tried that, but ran out of memory. I did
some search and found some suggestions
that we should try to avoid rdd.union(
http://stackoverflow.com/questions/28343181/memory-efficient-way-of-union-a-sequence-of-rdds-from-files-in-apache-spark
).
I will try to come up with some other ways.

Thank you,
Yang

On Thu, Mar 26, 2015 at 1:13 PM, Noorul Islam K M  wrote:

> sparkx  writes:
>
> > Hi,
> >
> > I have a Spark job and a dataset of 0.5 Million items. Each item performs
> > some sort of computation (joining a shared external dataset, if that does
> > matter) and produces an RDD containing 20-500 result items. Now I would
> like
> > to combine all these RDDs and perform a next job. What I have found out
> is
> > that the computation itself is quite fast, but combining these RDDs takes
> > much longer time.
> >
> > val result = data// 0.5M data items
> >   .map(compute(_))   // Produces an RDD - fast
> >   .reduce(_ ++ _)  // Combining RDDs - slow
> >
> > I have also tried to collect results from compute(_) and use a flatMap,
> but
> > that is also slow.
> >
> > Is there a way to efficiently do this? I'm thinking about writing this
> > result to HDFS and reading from disk for the next job, but am not sure if
> > that's a preferred way in Spark.
> >
>
> Are you looking for SparkContext.union() [1] ?
>
> This is not performing well with spark cassandra connector. I am not
> sure whether this will help you.
>
> Thanks and Regards
> Noorul
>
> [1]
> http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext
>



-- 
Yang Chen
Dept. of CISE, University of Florida
Mail: y...@yang-cs.com
Web: www.cise.ufl.edu/~yang


Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread David Holiday
w0t! that did it! t/y so much!

I'm going to put together a pastebin or something that has all the code put 
together so if anyone else runs into this issue they will have some working 
code to help them figure out what's going on.

DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com


[cid:AE39C43E-3FF7-4C90-BCE4-9711C84C4CB8@cld.annailabs.com]
www.AnnaiSystems.com

On Mar 26, 2015, at 12:24 PM, Corey Nolet 
mailto:cjno...@gmail.com>> wrote:

Spark uses a SerializableWritable [1] to java serialize writable objects. I've 
noticed (at least in Spark 1.2.1) that it breaks down with some objects when 
Kryo is used instead of regular java serialization. Though it is  wrapping the 
actual AccumuloInputFormat (another example of something you may want to do in 
the future), we have Accumulo working to load data from a table into Spark SQL 
[2]. The way Spark uses the InputFormat is very straightforward.

[1] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala
[2] 
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76

On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath 
mailto:nick.pentre...@gmail.com>> wrote:
I'm guessing the Accumulo Key and Value classes are not serializable, so you 
would need to do something like

val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) => 
(extractScalaType(key), extractScalaType(value)) }

Where 'extractScalaType converts the key or Value to a standard Scala type or 
case class or whatever - basically extracts the data from the Key or Value in a 
form usable in Scala

—
Sent from Mailbox



On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks 
mailto:rwe...@newbrightidea.com>> wrote:

Hi, David,

This is the code that I use to create a JavaPairRDD from an Accumulo table:

JavaSparkContext sc = new JavaSparkContext(conf);
Job hadoopJob = Job.getInstance(conf,"TestSparkJob");
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setZooKeeperInstance(job,
conf.get(ZOOKEEPER_INSTANCE_NAME,
conf.get(ZOOKEEPER_HOSTS)
);
AccumuloInputFormat.setConnectorInfo(job,
conf.get(ACCUMULO_AGILE_USERNAME),
new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
);
AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME));
AccumuloInputFormat.setScanAuthorizations(job, auths);
JavaPairRDD values = 
sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, 
Key.class, Value.class);

Key.class and Value.class are from org.apache.accumulo.core.data. I use a 
WholeRowIterator so that the Value is actually an encoded representation of an 
entire logical row; it's a useful convenience if you can be sure that your rows 
always fit in memory.

I haven't tested it since Spark 1.0.1 but I doubt anything important has 
changed.

Regards,
-Russ


On Thu, Mar 26, 2015 at 11:41 AM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:

 progress!

i was able to figure out why the 'input INFO not set' error was occurring. the 
eagle-eyed among you will no doubt see the following code is missing a closing 
'('

AbstractInputFormat.setConnectorInfo(jobConf, "root", new 
PasswordToken("password")

as I'm doing this in spark-notebook, I'd been clicking the execute button and 
moving on because I wasn't seeing an error. what I forgot was that notebook is 
going to do what spark-shell will do when you leave off a closing ')' -- it 
will wait forever for you to add it. so the error was the result of the 
'setConnectorInfo' method never getting executed.

unfortunately, I'm still unable to shove the accumulo table data into an RDD 
that's useable to me. when I execute

rddX.count

I get back

res15: Long = 1

which is the correct response - there are 10,000 rows of data in the table I 
pointed to. however, when I try to grab the first element of data thusly:

rddX.first

I get the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in 
stage 0.0 (TID 0) had a not serializable result: 
org.apache.accumulo.core.data.Key

any thoughts on where to go from here?

DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com




www.AnnaiSystems.com

On Mar 26, 2015, at 8:35 AM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:

hi Nick

Unfortunately the Accumulo docs are woefully inadequate, and in some places, 
flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or 
if there's some wrinke with spark-notebook in the mix that's messing everything 
up. I've been working with some people on stack overflow on this same issue 
(including one of the people from the spark-notebook

Re: Spark SQL queries hang forever

2015-03-26 Thread Michael Armbrust
Is it possible to jstack the executors and see where they are hanging?

On Thu, Mar 26, 2015 at 2:02 PM, Jon Chase  wrote:

> Spark 1.3.0 on YARN (Amazon EMR), cluster of 10 m3.2xlarge (8cpu, 30GB),
> executor memory 20GB, driver memory 10GB
>
> I'm using Spark SQL, mainly via spark-shell, to query 15GB of data spread
> out over roughly 2,000 Parquet files and my queries frequently hang. Simple
> queries like "select count(*) from ..." on the entire data set work ok.
> Slightly more demanding ones with group by's and some aggregate functions
> (percentile_approx, avg, etc.) work ok as well, as long as I have some
> criteria in my where clause to keep the number of rows down.
>
> Once I hit some limit on query complexity and rows processed, my queries
> start to hang.  I've left them for up to an hour without seeing any
> progress.  No OOM's either - the job is just stuck.
>
> I've tried setting spark.sql.shuffle.partitions to 400 and even 800, but
> with the same results: usually near the end of the tasks (like 780 of 800
> complete), progress just stops:
>
> 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 788.0 in
> stage 1.0 (TID 1618) in 800 ms on
> ip-10-209-22-211.eu-west-1.compute.internal (748/800)
> 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 793.0 in
> stage 1.0 (TID 1623) in 622 ms on
> ip-10-105-12-41.eu-west-1.compute.internal (749/800)
> 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 797.0 in
> stage 1.0 (TID 1627) in 616 ms on ip-10-90-2-201.eu-west-1.compute.internal
> (750/800)
> 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 799.0 in
> stage 1.0 (TID 1629) in 611 ms on ip-10-90-2-201.eu-west-1.compute.internal
> (751/800)
> 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 795.0 in
> stage 1.0 (TID 1625) in 669 ms on
> ip-10-105-12-41.eu-west-1.compute.internal (752/800)
>
> ^^^ this is where it stays forever
>
> Looking at the Spark UI, several of the executors still list active
> tasks.  I do see that the Shuffle Read for executors that don't have any
> tasks remaining is around 100MB, whereas it's more like 10MB for the
> executors that still have tasks.
>
> The first stage, mapPartitions, always completes fine.  It's the second
> stage (takeOrdered), that hangs.
>
> I've had this issue in 1.2.0 and 1.2.1 as well as 1.3.0.  I've also
> encountered it when using JSON files (instead of Parquet).
>
> Thoughts?  I'm blocked on using Spark SQL b/c most of the queries I do are
> having this issue.
>


Spark SQL queries hang forever

2015-03-26 Thread Jon Chase
Spark 1.3.0 on YARN (Amazon EMR), cluster of 10 m3.2xlarge (8cpu, 30GB),
executor memory 20GB, driver memory 10GB

I'm using Spark SQL, mainly via spark-shell, to query 15GB of data spread
out over roughly 2,000 Parquet files and my queries frequently hang. Simple
queries like "select count(*) from ..." on the entire data set work ok.
Slightly more demanding ones with group by's and some aggregate functions
(percentile_approx, avg, etc.) work ok as well, as long as I have some
criteria in my where clause to keep the number of rows down.

Once I hit some limit on query complexity and rows processed, my queries
start to hang.  I've left them for up to an hour without seeing any
progress.  No OOM's either - the job is just stuck.

I've tried setting spark.sql.shuffle.partitions to 400 and even 800, but
with the same results: usually near the end of the tasks (like 780 of 800
complete), progress just stops:

15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 788.0 in
stage 1.0 (TID 1618) in 800 ms on
ip-10-209-22-211.eu-west-1.compute.internal (748/800)
15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 793.0 in
stage 1.0 (TID 1623) in 622 ms on
ip-10-105-12-41.eu-west-1.compute.internal (749/800)
15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 797.0 in
stage 1.0 (TID 1627) in 616 ms on ip-10-90-2-201.eu-west-1.compute.internal
(750/800)
15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 799.0 in
stage 1.0 (TID 1629) in 611 ms on ip-10-90-2-201.eu-west-1.compute.internal
(751/800)
15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 795.0 in
stage 1.0 (TID 1625) in 669 ms on
ip-10-105-12-41.eu-west-1.compute.internal (752/800)

^^^ this is where it stays forever

Looking at the Spark UI, several of the executors still list active tasks.
I do see that the Shuffle Read for executors that don't have any tasks
remaining is around 100MB, whereas it's more like 10MB for the executors
that still have tasks.

The first stage, mapPartitions, always completes fine.  It's the second
stage (takeOrdered), that hangs.

I've had this issue in 1.2.0 and 1.2.1 as well as 1.3.0.  I've also
encountered it when using JSON files (instead of Parquet).

Thoughts?  I'm blocked on using Spark SQL b/c most of the queries I do are
having this issue.


Error in creating log directory

2015-03-26 Thread pzilaro
I get the following error message when I start pyspark shell.
The config has the following settings-
# spark.masterspark://master:7077
# spark.eventLog.enabled  true
# spark.eventLog.dir  hdfs://namenode:8021/directory
# spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.eventLog.dir=/user/spark/applicationHistory
spark.eventLog.enabled=true
spark.yarn.historyServer.address=name101-car.ldcint.com:10020


[pzilaro@name101-car conf]$ pyspark
Python 2.6.6 (r266:84292, Jan 22 2014, 09:42:36)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
15/03/26 13:46:06 INFO spark.SecurityManager: Changing view acls to: pzilaro
15/03/26 13:46:06 INFO spark.SecurityManager: Changing modify acls to:
pzilaro
15/03/26 13:46:06 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(pzilaro); users with modify permissions: Set(pzilaro)
15/03/26 13:46:07 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/03/26 13:46:07 INFO Remoting: Starting remoting
15/03/26 13:46:07 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkdri...@name101-car.ldcint.com:48040]
15/03/26 13:46:07 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkdri...@name101-car.ldcint.com:48040]
15/03/26 13:46:07 INFO util.Utils: Successfully started service
'sparkDriver' on port 48040.
15/03/26 13:46:07 INFO spark.SparkEnv: Registering MapOutputTracker
15/03/26 13:46:07 INFO spark.SparkEnv: Registering BlockManagerMaster
15/03/26 13:46:07 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20150326134607-072e
15/03/26 13:46:07 INFO storage.MemoryStore: MemoryStore started with
capacity 265.4 MB
15/03/26 13:46:08 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-2f342a3a-c5bb-474d-867b-8bd5b9f9d1ac
15/03/26 13:46:08 INFO spark.HttpServer: Starting HTTP Server
15/03/26 13:46:08 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/03/26 13:46:08 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:55296
15/03/26 13:46:08 INFO util.Utils: Successfully started service 'HTTP file
server' on port 55296.
15/03/26 13:46:08 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/03/26 13:46:08 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/03/26 13:46:08 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.
15/03/26 13:46:08 INFO ui.SparkUI: Started SparkUI at
http://name101-car.ldcint.com:4040
15/03/26 13:46:08 INFO util.AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkdri...@name101-car.ldcint.com:48040/user/HeartbeatReceiver
15/03/26 13:46:08 INFO netty.NettyBlockTransferService: Server created on
55241
15/03/26 13:46:08 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/03/26 13:46:08 INFO storage.BlockManagerMasterActor: Registering block
manager localhost:55241 with 265.4 MB RAM, BlockManagerId(,
localhost, 55241)
15/03/26 13:46:08 INFO storage.BlockManagerMaster: Registered BlockManager
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/shell.py", line 45, in 
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
  File "/usr/lib/spark/python/pyspark/context.py", line 105, in __init__
conf, jsc)
  File "/usr/lib/spark/python/pyspark/context.py", line 153, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
  File "/usr/lib/spark/python/pyspark/context.py", line 201, in
_initialize_context
return self._jvm.JavaSparkContext(jconf)
  File
"/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line
701, in __call__
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.io.IOException: Error in creating log directory:
file:/user/spark/applicationHistory//local-1427402768636
at
org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:133)
at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
at
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
at org.apache.spark.SparkContext.(SparkContext.scala:353)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:21

Fuzzy GroupBy

2015-03-26 Thread Mihran Shahinian
I would like to group records, but instead of grouping on exact key I want
to be able to compute the similarity of keys on my own. Is there a
recommended way of doing this?

here is my starting point

final JavaRDD< pojo > records = spark.parallelize(getListofPojos()).cache();
class pojo {
 String prop1
 String prop2
}

during groupBy I would like to compute similarity between prop1 for each
pojo.

Much appreciated,
Mihran


RE: EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out

2015-03-26 Thread Adrian Mocanu
I also get stack overflow every now and then without having any recursive calls:

java.lang.StackOverflowError: null
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1479) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
~[na:1.7.0_75]
at 
scala.collection.immutable.$colon$colon.writeObject(List.scala:379) 
~[scala-library.jar:na]
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) 
~[na:na]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.7.0_75]
this is a huge stack trace... but it keeps repeating

What could this be from?

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: March 26, 2015 2:10 PM
To: u...@spark.incubator.apache.org; user@spark.apache.org
Subject: EsHadoopSerializationException: java.net.SocketTimeoutException: Read 
timed out

Hi
I need help fixing a time out exception thrown from ElasticSearch Hadoop. The 
ES cluster is up all the time.
I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection 
of these RDD which I traverse (with foreachRDD) and create more RDDs from each 
one RDD in the collection. The resulting RDDs I put in a Queue from which I 
create a DStream.
After about 10 minutes of running, the program's debug output hangs for a bit 
then throws:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out

This is the output:
 [data from elastic search like the next line (in green)]
13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content - << 
"toyota sprinter","toyota crown","toyota tundra","toyota prius","toyota 
aa","toyota stout","toyota camry","toyota vista","toyota","toyota 
classic","toyota sprinter","toyota crown","toyota tundra","toyota 
prius","toyota aa","toyota stout","toyota camry","toyota 
vista","toyota","toyota classic","toyota sprinter","toyota crown","toyota 
tundra","toyota prius","toyota aa","toyota stout","toyota camry","toyota 
vista","toyota","toyota classic","toyota sprinter","toyota crown","toyota 
tundra","toyota prius","toyota aa","toyota stout","toyota camry","toyota 
vista"],"timestamp":[1373976139000],"links.entity.rank":[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}}"
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close 
connection policy
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpConnection - Releasing connection back to connection 
manager.
13:55:26.631 [Executor task launch worker-0] ERROR 
org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10)
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.

Re: Parallel actions from driver

2015-03-26 Thread Sean Owen
You can do this much more simply, I think, with Scala's parallel
collections (try .par). There's nothing wrong with doing this, no.

Here, something is getting caught in your closure, maybe
unintentionally, that's not serializable. It's not directly related to
the parallelism.

On Thu, Mar 26, 2015 at 3:54 PM, Aram Mkrtchyan
 wrote:
> Hi.
>
> I'm trying to trigger DataFrame's save method in parallel from my driver.
> For that purposes I use ExecutorService and Futures, here's my code:
>
>
> val futures = [1,2,3].map( t => pool.submit( new Runnable {
>
> override def run(): Unit = {
> val commons = events.filter(_._1 == t).map(_._2.common)
> saveAsParquetFile(sqlContext, commons, s"$t/common")
> EventTypes.all.foreach { et =>
> val eventData = events.filter(ev => ev._1 == t && ev._2.eventType ==
> et).map(_._2.data)
> saveAsParquetFile(sqlContext, eventData, s"$t/$et")
> }
> }
>
> }))
> futures.foreach(_.get)
>
> It throws "Task is not Serializable" exception. Is it legal to use threads
> in driver to trigger actions?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread Corey Nolet
Spark uses a SerializableWritable [1] to java serialize writable objects.
I've noticed (at least in Spark 1.2.1) that it breaks down with some
objects when Kryo is used instead of regular java serialization. Though it
is  wrapping the actual AccumuloInputFormat (another example of something
you may want to do in the future), we have Accumulo working to load data
from a table into Spark SQL [2]. The way Spark uses the InputFormat is very
straightforward.

[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala
[2]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76

On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath 
wrote:

> I'm guessing the Accumulo Key and Value classes are not serializable, so
> you would need to do something like
>
> val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) =>
> (extractScalaType(key), extractScalaType(value)) }
>
> Where 'extractScalaType converts the key or Value to a standard Scala type
> or case class or whatever - basically extracts the data from the Key or
> Value in a form usable in Scala
>
> —
> Sent from Mailbox 
>
>
> On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks 
> wrote:
>
>> Hi, David,
>>
>> This is the code that I use to create a JavaPairRDD from an Accumulo
>> table:
>>
>>  JavaSparkContext sc = new JavaSparkContext(conf);
>> Job hadoopJob = Job.getInstance(conf,"TestSparkJob");
>> job.setInputFormatClass(AccumuloInputFormat.class);
>> AccumuloInputFormat.setZooKeeperInstance(job,
>> conf.get(ZOOKEEPER_INSTANCE_NAME,
>> conf.get(ZOOKEEPER_HOSTS)
>> );
>> AccumuloInputFormat.setConnectorInfo(job,
>> conf.get(ACCUMULO_AGILE_USERNAME),
>> new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
>> );
>> AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME));
>> AccumuloInputFormat.setScanAuthorizations(job, auths);
>> JavaPairRDD values =
>> sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class,
>> Key.class, Value.class);
>>
>> Key.class and Value.class are from org.apache.accumulo.core.data. I use a
>> WholeRowIterator so that the Value is actually an encoded representation of
>> an entire logical row; it's a useful convenience if you can be sure that
>> your rows always fit in memory.
>>
>> I haven't tested it since Spark 1.0.1 but I doubt anything important has
>> changed.
>>
>> Regards,
>> -Russ
>>
>>
>> On Thu, Mar 26, 2015 at 11:41 AM, David Holiday 
>> wrote:
>>
>>>  * progress!*
>>>
>>> i was able to figure out why the 'input INFO not set' error was
>>> occurring. the eagle-eyed among you will no doubt see the following code is
>>> missing a closing '('
>>>
>>> AbstractInputFormat.setConnectorInfo(jobConf, "root", new 
>>> PasswordToken("password")
>>>
>>> as I'm doing this in spark-notebook, I'd been clicking the execute
>>> button and moving on because I wasn't seeing an error. what I forgot was
>>> that notebook is going to do what spark-shell will do when you leave off a
>>> closing ')' -- *it will wait forever for you to add it*. so the error
>>> was the result of the 'setConnectorInfo' method never getting executed.
>>>
>>> unfortunately, I'm still unable to shove the accumulo table data into an
>>> RDD that's useable to me. when I execute
>>>
>>> rddX.count
>>>
>>> I get back
>>>
>>> res15: Long = 1
>>>
>>> which is the correct response - there are 10,000 rows of data in the
>>> table I pointed to. however, when I try to grab the first element of data
>>> thusly:
>>>
>>> rddX.first
>>>
>>> I get the following error:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0.0 in stage 0.0 (TID 0) had a not serializable result:
>>> org.apache.accumulo.core.data.Key
>>>
>>> any thoughts on where to go from here?
>>>   DAVID HOLIDAY
>>>  Software Engineer
>>>  760 607 3300 | Office
>>>  312 758 8385 | Mobile
>>>  dav...@annaisystems.com 
>>>
>>>
>>> 
>>>
>>> www.AnnaiSystems.com
>>>
>>>  On Mar 26, 2015, at 8:35 AM, David Holiday 
>>> wrote:
>>>
>>>  hi Nick
>>>
>>> Unfortunately the Accumulo docs are woefully inadequate, and in some
>>> places, flat wrong. I'm not sure if this is a case where the docs are 'flat
>>> wrong', or if there's some wrinke with spark-notebook in the mix that's
>>> messing everything up. I've been working with some people on stack overflow
>>> on this same issue (including one of the people from the spark-notebook
>>> team):
>>>
>>>
>>> http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530
>>>
>>> if you click the link you can see the entire thread of code, responses
>>> from notebook, etc. I'm going to try invoking the same techniques both from
>>> within a stand-alone scala problem and from the shell itself to see if I
>>> can get some traction. I'll repo

Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread Nick Pentreath
I'm guessing the Accumulo Key and Value classes are not serializable, so you 
would need to do something like 




val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) => 
(extractScalaType(key), extractScalaType(value)) }




Where 'extractScalaType converts the key or Value to a standard Scala type or 
case class or whatever - basically extracts the data from the Key or Value in a 
form usable in Scala 



—
Sent from Mailbox

On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks 
wrote:

> Hi, David,
> This is the code that I use to create a JavaPairRDD from an Accumulo table:
> JavaSparkContext sc = new JavaSparkContext(conf);
> Job hadoopJob = Job.getInstance(conf,"TestSparkJob");
> job.setInputFormatClass(AccumuloInputFormat.class);
> AccumuloInputFormat.setZooKeeperInstance(job,
> conf.get(ZOOKEEPER_INSTANCE_NAME,
> conf.get(ZOOKEEPER_HOSTS)
> );
> AccumuloInputFormat.setConnectorInfo(job,
> conf.get(ACCUMULO_AGILE_USERNAME),
> new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
> );
> AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME));
> AccumuloInputFormat.setScanAuthorizations(job, auths);
> JavaPairRDD values =
> sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class,
> Key.class, Value.class);
> Key.class and Value.class are from org.apache.accumulo.core.data. I use a
> WholeRowIterator so that the Value is actually an encoded representation of
> an entire logical row; it's a useful convenience if you can be sure that
> your rows always fit in memory.
> I haven't tested it since Spark 1.0.1 but I doubt anything important has
> changed.
> Regards,
> -Russ
> On Thu, Mar 26, 2015 at 11:41 AM, David Holiday 
> wrote:
>>  * progress!*
>>
>> i was able to figure out why the 'input INFO not set' error was occurring.
>> the eagle-eyed among you will no doubt see the following code is missing a
>> closing '('
>>
>> AbstractInputFormat.setConnectorInfo(jobConf, "root", new 
>> PasswordToken("password")
>>
>> as I'm doing this in spark-notebook, I'd been clicking the execute button
>> and moving on because I wasn't seeing an error. what I forgot was that
>> notebook is going to do what spark-shell will do when you leave off a
>> closing ')' -- *it will wait forever for you to add it*. so the error was
>> the result of the 'setConnectorInfo' method never getting executed.
>>
>> unfortunately, I'm still unable to shove the accumulo table data into an
>> RDD that's useable to me. when I execute
>>
>> rddX.count
>>
>> I get back
>>
>> res15: Long = 1
>>
>> which is the correct response - there are 10,000 rows of data in the table
>> I pointed to. however, when I try to grab the first element of data thusly:
>>
>> rddX.first
>>
>> I get the following error:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 0.0 in stage 0.0 (TID 0) had a not serializable result:
>> org.apache.accumulo.core.data.Key
>>
>> any thoughts on where to go from here?
>>  DAVID HOLIDAY
>>  Software Engineer
>>  760 607 3300 | Office
>>  312 758 8385 | Mobile
>>  dav...@annaisystems.com 
>>
>>
>>
>> www.AnnaiSystems.com
>>
>>  On Mar 26, 2015, at 8:35 AM, David Holiday 
>> wrote:
>>
>>  hi Nick
>>
>>  Unfortunately the Accumulo docs are woefully inadequate, and in some
>> places, flat wrong. I'm not sure if this is a case where the docs are 'flat
>> wrong', or if there's some wrinke with spark-notebook in the mix that's
>> messing everything up. I've been working with some people on stack overflow
>> on this same issue (including one of the people from the spark-notebook
>> team):
>>
>>
>> http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530
>>
>>  if you click the link you can see the entire thread of code, responses
>> from notebook, etc. I'm going to try invoking the same techniques both from
>> within a stand-alone scala problem and from the shell itself to see if I
>> can get some traction. I'll report back when I have more data.
>>
>>  cheers (and thx!)
>>
>>
>>
>> DAVID HOLIDAY
>>  Software Engineer
>>  760 607 3300 | Office
>>  312 758 8385 | Mobile
>>  dav...@annaisystems.com 
>>
>>
>> 
>> www.AnnaiSystems.com 
>>
>>  On Mar 25, 2015, at 11:43 PM, Nick Pentreath 
>> wrote:
>>
>>  From a quick look at this link -
>> http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it
>> seems you need to call some static methods on AccumuloInputFormat in order
>> to set the auth, table, and range settings. Try setting these config
>> options first and then call newAPIHadoopRDD?
>>
>> On Thu, Mar 26, 2015 at 2:34 AM, David Holiday 
>> wrote:
>>
>>> hi Irfan,
>>>
>>>  thanks for getting back to me - i'll try the accumulo list to be sure.
>>> what is the normal use case for spark though? I'm surprised that hooking it
>>> into something as common and popular as accumulo isn't more of an every-day
>>> task.
>>>
>>> DAVID 

Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread Russ Weeks
Hi, David,

This is the code that I use to create a JavaPairRDD from an Accumulo table:

JavaSparkContext sc = new JavaSparkContext(conf);
Job hadoopJob = Job.getInstance(conf,"TestSparkJob");
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setZooKeeperInstance(job,
conf.get(ZOOKEEPER_INSTANCE_NAME,
conf.get(ZOOKEEPER_HOSTS)
);
AccumuloInputFormat.setConnectorInfo(job,
conf.get(ACCUMULO_AGILE_USERNAME),
new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
);
AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME));
AccumuloInputFormat.setScanAuthorizations(job, auths);
JavaPairRDD values =
sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class,
Key.class, Value.class);

Key.class and Value.class are from org.apache.accumulo.core.data. I use a
WholeRowIterator so that the Value is actually an encoded representation of
an entire logical row; it's a useful convenience if you can be sure that
your rows always fit in memory.

I haven't tested it since Spark 1.0.1 but I doubt anything important has
changed.

Regards,
-Russ


On Thu, Mar 26, 2015 at 11:41 AM, David Holiday 
wrote:

>  * progress!*
>
> i was able to figure out why the 'input INFO not set' error was occurring.
> the eagle-eyed among you will no doubt see the following code is missing a
> closing '('
>
> AbstractInputFormat.setConnectorInfo(jobConf, "root", new 
> PasswordToken("password")
>
> as I'm doing this in spark-notebook, I'd been clicking the execute button
> and moving on because I wasn't seeing an error. what I forgot was that
> notebook is going to do what spark-shell will do when you leave off a
> closing ')' -- *it will wait forever for you to add it*. so the error was
> the result of the 'setConnectorInfo' method never getting executed.
>
> unfortunately, I'm still unable to shove the accumulo table data into an
> RDD that's useable to me. when I execute
>
> rddX.count
>
> I get back
>
> res15: Long = 1
>
> which is the correct response - there are 10,000 rows of data in the table
> I pointed to. however, when I try to grab the first element of data thusly:
>
> rddX.first
>
> I get the following error:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0 in stage 0.0 (TID 0) had a not serializable result:
> org.apache.accumulo.core.data.Key
>
> any thoughts on where to go from here?
>  DAVID HOLIDAY
>  Software Engineer
>  760 607 3300 | Office
>  312 758 8385 | Mobile
>  dav...@annaisystems.com 
>
>
>
> www.AnnaiSystems.com
>
>  On Mar 26, 2015, at 8:35 AM, David Holiday 
> wrote:
>
>  hi Nick
>
>  Unfortunately the Accumulo docs are woefully inadequate, and in some
> places, flat wrong. I'm not sure if this is a case where the docs are 'flat
> wrong', or if there's some wrinke with spark-notebook in the mix that's
> messing everything up. I've been working with some people on stack overflow
> on this same issue (including one of the people from the spark-notebook
> team):
>
>
> http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530
>
>  if you click the link you can see the entire thread of code, responses
> from notebook, etc. I'm going to try invoking the same techniques both from
> within a stand-alone scala problem and from the shell itself to see if I
> can get some traction. I'll report back when I have more data.
>
>  cheers (and thx!)
>
>
>
> DAVID HOLIDAY
>  Software Engineer
>  760 607 3300 | Office
>  312 758 8385 | Mobile
>  dav...@annaisystems.com 
>
>
> 
> www.AnnaiSystems.com 
>
>  On Mar 25, 2015, at 11:43 PM, Nick Pentreath 
> wrote:
>
>  From a quick look at this link -
> http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it
> seems you need to call some static methods on AccumuloInputFormat in order
> to set the auth, table, and range settings. Try setting these config
> options first and then call newAPIHadoopRDD?
>
> On Thu, Mar 26, 2015 at 2:34 AM, David Holiday 
> wrote:
>
>> hi Irfan,
>>
>>  thanks for getting back to me - i'll try the accumulo list to be sure.
>> what is the normal use case for spark though? I'm surprised that hooking it
>> into something as common and popular as accumulo isn't more of an every-day
>> task.
>>
>> DAVID HOLIDAY
>>  Software Engineer
>>  760 607 3300 | Office
>>  312 758 8385 | Mobile
>>  dav...@annaisystems.com 
>>
>>
>> 
>> www.AnnaiSystems.com 
>>
>>On Mar 25, 2015, at 5:27 PM, Irfan Ahmad 
>> wrote:
>>
>>Hmmm this seems very accumulo-specific, doesn't it? Not sure how
>> to help with that.
>>
>>
>>  *Irfan Ahmad*
>> CTO | Co-Founder | *CloudPhysics* 
>> Best of VMworld Finalist
>>  Best Cloud Management Award
>>  NetworkWorld 10 Startups to Watch
>> EMA Most Notable Vendor
>>
>>   On Tue, Mar 24, 2015 at 4:09 PM, David Holiday > > wrote:
>>
>>>  hi all,
>>>

Re: RDD to DataFrame for using ALS under org.apache.spark.ml.recommendation.ALS

2015-03-26 Thread Chang Lim
After this line:
   val sc = new SparkContext(conf) 
You need to add this line:
   import sc.implicits._  //this is used to implicitly convert an RDD to a
DataFrame.

Hope this helps



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-DataFrame-for-using-ALS-under-org-apache-spark-ml-recommendation-ALS-tp22083p22247.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread David Holiday
 progress!

i was able to figure out why the 'input INFO not set' error was occurring. the 
eagle-eyed among you will no doubt see the following code is missing a closing 
'('

AbstractInputFormat.setConnectorInfo(jobConf, "root", new 
PasswordToken("password")

as I'm doing this in spark-notebook, I'd been clicking the execute button and 
moving on because I wasn't seeing an error. what I forgot was that notebook is 
going to do what spark-shell will do when you leave off a closing ')' -- it 
will wait forever for you to add it. so the error was the result of the 
'setConnectorInfo' method never getting executed.

unfortunately, I'm still unable to shove the accumulo table data into an RDD 
that's useable to me. when I execute

rddX.count

I get back

res15: Long = 1

which is the correct response - there are 10,000 rows of data in the table I 
pointed to. however, when I try to grab the first element of data thusly:

rddX.first

I get the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in 
stage 0.0 (TID 0) had a not serializable result: 
org.apache.accumulo.core.data.Key

any thoughts on where to go from here?

DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com


[cid:AE39C43E-3FF7-4C90-BCE4-9711C84C4CB8@cld.annailabs.com]
www.AnnaiSystems.com

On Mar 26, 2015, at 8:35 AM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:

hi Nick

Unfortunately the Accumulo docs are woefully inadequate, and in some places, 
flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or 
if there's some wrinke with spark-notebook in the mix that's messing everything 
up. I've been working with some people on stack overflow on this same issue 
(including one of the people from the spark-notebook team):

http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530

if you click the link you can see the entire thread of code, responses from 
notebook, etc. I'm going to try invoking the same techniques both from within a 
stand-alone scala problem and from the shell itself to see if I can get some 
traction. I'll report back when I have more data.

cheers (and thx!)



DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com



www.AnnaiSystems.com

On Mar 25, 2015, at 11:43 PM, Nick Pentreath 
mailto:nick.pentre...@gmail.com>> wrote:

From a quick look at this link - 
http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it seems 
you need to call some static methods on AccumuloInputFormat in order to set the 
auth, table, and range settings. Try setting these config options first and 
then call newAPIHadoopRDD?

On Thu, Mar 26, 2015 at 2:34 AM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:
hi Irfan,

thanks for getting back to me - i'll try the accumulo list to be sure. what is 
the normal use case for spark though? I'm surprised that hooking it into 
something as common and popular as accumulo isn't more of an every-day task.

DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com



www.AnnaiSystems.com

On Mar 25, 2015, at 5:27 PM, Irfan Ahmad 
mailto:ir...@cloudphysics.com>> wrote:

Hmmm this seems very accumulo-specific, doesn't it? Not sure how to help 
with that.


Irfan Ahmad
CTO | Co-Founder | CloudPhysics
Best of VMworld Finalist
Best Cloud Management Award
NetworkWorld 10 Startups to Watch
EMA Most Notable Vendor

On Tue, Mar 24, 2015 at 4:09 PM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:
hi all,

got a vagrant image with spark notebook, spark, accumulo, and hadoop all 
running. from notebook I can manually create a scanner and pull test data from 
a table I created using one of the accumulo examples:

val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new 
PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)

scanner.setRange(new Range("row_00", "row_10"))

for(entry: Entry[Key, Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

will give the first ten rows of table data. when I try to create the RDD thusly:

val rdd2 =
  sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
  )

I get an RDD returned to me that I can't do much with due 

Re: python : Out of memory: Kill process

2015-03-26 Thread Davies Liu
Could you narrow down to a step which cause the OOM, something like:

log2= self.sqlContext.jsonFile(path)
log2.count()
...
out.count()
...

On Thu, Mar 26, 2015 at 10:34 AM, Eduardo Cusa
 wrote:
> the last try was without log2.cache() and still getting out of memory
>
> I using the following conf, maybe help:
>
>
>
>   conf = (SparkConf()
>   .setAppName("LoadS3")
>   .set("spark.executor.memory", "13g")
>   .set("spark.driver.memory", "13g")
>   .set("spark.driver.maxResultSize","2g")
>   .set("spark.default.parallelism","200")
>   .set("spark.kryoserializer.buffer.mb","512"))
>   sc = SparkContext(conf=conf )
>   sqlContext = SQLContext(sc)
>
>
>
>
>
> On Thu, Mar 26, 2015 at 2:29 PM, Davies Liu  wrote:
>>
>> Could you try to remove the line `log2.cache()` ?
>>
>> On Thu, Mar 26, 2015 at 10:02 AM, Eduardo Cusa
>>  wrote:
>> > I running on ec2 :
>> >
>> > 1 Master : 4 CPU 15 GB RAM  (2 GB swap)
>> >
>> > 2 Slaves  4 CPU 15 GB RAM
>> >
>> >
>> > the uncompressed dataset size is 15 GB
>> >
>> >
>> >
>> >
>> > On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa
>> >  wrote:
>> >>
>> >> Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory.
>> >>
>> >> I ran the same code as before, I need to make any changes?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu 
>> >> wrote:
>> >>>
>> >>> With batchSize = 1, I think it will become even worse.
>> >>>
>> >>> I'd suggest to go with 1.3, have a taste for the new DataFrame API.
>> >>>
>> >>> On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa
>> >>>  wrote:
>> >>> > Hi Davies, I running 1.1.0.
>> >>> >
>> >>> > Now I'm following this thread that recommend use batchsize parameter
>> >>> > =
>> >>> > 1
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> > http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html
>> >>> >
>> >>> > if this does not work I will install  1.2.1 or  1.3
>> >>> >
>> >>> > Regards
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> > On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu 
>> >>> > wrote:
>> >>> >>
>> >>> >> What's the version of Spark you are running?
>> >>> >>
>> >>> >> There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3,
>> >>> >>
>> >>> >> [1] https://issues.apache.org/jira/browse/SPARK-6055
>> >>> >>
>> >>> >> On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa
>> >>> >>  wrote:
>> >>> >> > Hi Guys, I running the following function with spark-submmit and
>> >>> >> > de
>> >>> >> > SO
>> >>> >> > is
>> >>> >> > killing my process :
>> >>> >> >
>> >>> >> >
>> >>> >> >   def getRdd(self,date,provider):
>> >>> >> > path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz'
>> >>> >> > log2= self.sqlContext.jsonFile(path)
>> >>> >> > log2.registerTempTable('log_test')
>> >>> >> > log2.cache()
>> >>> >>
>> >>> >> You only visit the table once, cache does not help here.
>> >>> >>
>> >>> >> > out=self.sqlContext.sql("SELECT user, tax from log_test where
>> >>> >> > provider =
>> >>> >> > '"+provider+"'and country <> ''").map(lambda row: (row.user,
>> >>> >> > row.tax))
>> >>> >> > print "out1"
>> >>> >> > return  map((lambda (x,y): (x, list(y))),
>> >>> >> > sorted(out.groupByKey(2000).collect()))
>> >>> >>
>> >>> >> 100 partitions (or less) will be enough for 2G dataset.
>> >>> >>
>> >>> >> >
>> >>> >> >
>> >>> >> > The input dataset has 57 zip files (2 GB)
>> >>> >> >
>> >>> >> > The same process with a smaller dataset completed successfully
>> >>> >> >
>> >>> >> > Any ideas to debug is welcome.
>> >>> >> >
>> >>> >> > Regards
>> >>> >> > Eduardo
>> >>> >> >
>> >>> >> >
>> >>> >
>> >>> >
>> >>
>> >>
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HQL function Rollup and Cube

2015-03-26 Thread Chang Lim
Solved.  In IDE, project settings was missing the dependent lib jars (jar
files under spark-xx/lib). When theses jar is not set, I got class not found
error about datanucleus classes (compared to an out of memory error in Spark
Shell).

In the context of Spark Shell, these dependent jars needs to be passed in at
the spark-shell command line.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HQL-function-Rollup-and-Cube-tp22241p22246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out

2015-03-26 Thread Adrian Mocanu
Hi
I need help fixing a time out exception thrown from ElasticSearch Hadoop. The 
ES cluster is up all the time.
I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection 
of these RDD which I traverse (with foreachRDD) and create more RDDs from each 
one RDD in the collection. The resulting RDDs I put in a Queue from which I 
create a DStream.
After about 10 minutes of running, the program's debug output hangs for a bit 
then throws:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out

This is the output:
 [data from elastic search like the next line (in green)]
13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content - << 
"toyota sprinter","toyota crown","toyota tundra","toyota prius","toyota 
aa","toyota stout","toyota camry","toyota vista","toyota","toyota 
classic","toyota sprinter","toyota crown","toyota tundra","toyota 
prius","toyota aa","toyota stout","toyota camry","toyota 
vista","toyota","toyota classic","toyota sprinter","toyota crown","toyota 
tundra","toyota prius","toyota aa","toyota stout","toyota camry","toyota 
vista","toyota","toyota classic","toyota sprinter","toyota crown","toyota 
tundra","toyota prius","toyota aa","toyota stout","toyota camry","toyota 
vista"],"timestamp":[1373976139000],"links.entity.rank":[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}}"
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close 
connection policy
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpConnection - Releasing connection back to connection 
manager.
13:55:26.631 [Executor task launch worker-0] ERROR 
org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10)
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
java.util.conc

Re: python : Out of memory: Kill process

2015-03-26 Thread Eduardo Cusa
the last try was without log2.cache() and still getting out of memory

I using the following conf, maybe help:



  conf = (SparkConf()
  .setAppName("LoadS3")
  .set("spark.executor.memory", "13g")
  .set("spark.driver.memory", "13g")
  .set("spark.driver.maxResultSize","2g")
  .set("spark.default.parallelism","200")
  .set("spark.kryoserializer.buffer.mb","512"))
  sc = SparkContext(conf=conf )
  sqlContext = SQLContext(sc)





On Thu, Mar 26, 2015 at 2:29 PM, Davies Liu  wrote:

> Could you try to remove the line `log2.cache()` ?
>
> On Thu, Mar 26, 2015 at 10:02 AM, Eduardo Cusa
>  wrote:
> > I running on ec2 :
> >
> > 1 Master : 4 CPU 15 GB RAM  (2 GB swap)
> >
> > 2 Slaves  4 CPU 15 GB RAM
> >
> >
> > the uncompressed dataset size is 15 GB
> >
> >
> >
> >
> > On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa
> >  wrote:
> >>
> >> Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory.
> >>
> >> I ran the same code as before, I need to make any changes?
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu 
> wrote:
> >>>
> >>> With batchSize = 1, I think it will become even worse.
> >>>
> >>> I'd suggest to go with 1.3, have a taste for the new DataFrame API.
> >>>
> >>> On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa
> >>>  wrote:
> >>> > Hi Davies, I running 1.1.0.
> >>> >
> >>> > Now I'm following this thread that recommend use batchsize parameter
> =
> >>> > 1
> >>> >
> >>> >
> >>> >
> >>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html
> >>> >
> >>> > if this does not work I will install  1.2.1 or  1.3
> >>> >
> >>> > Regards
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu 
> >>> > wrote:
> >>> >>
> >>> >> What's the version of Spark you are running?
> >>> >>
> >>> >> There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3,
> >>> >>
> >>> >> [1] https://issues.apache.org/jira/browse/SPARK-6055
> >>> >>
> >>> >> On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa
> >>> >>  wrote:
> >>> >> > Hi Guys, I running the following function with spark-submmit and
> de
> >>> >> > SO
> >>> >> > is
> >>> >> > killing my process :
> >>> >> >
> >>> >> >
> >>> >> >   def getRdd(self,date,provider):
> >>> >> > path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz'
> >>> >> > log2= self.sqlContext.jsonFile(path)
> >>> >> > log2.registerTempTable('log_test')
> >>> >> > log2.cache()
> >>> >>
> >>> >> You only visit the table once, cache does not help here.
> >>> >>
> >>> >> > out=self.sqlContext.sql("SELECT user, tax from log_test where
> >>> >> > provider =
> >>> >> > '"+provider+"'and country <> ''").map(lambda row: (row.user,
> >>> >> > row.tax))
> >>> >> > print "out1"
> >>> >> > return  map((lambda (x,y): (x, list(y))),
> >>> >> > sorted(out.groupByKey(2000).collect()))
> >>> >>
> >>> >> 100 partitions (or less) will be enough for 2G dataset.
> >>> >>
> >>> >> >
> >>> >> >
> >>> >> > The input dataset has 57 zip files (2 GB)
> >>> >> >
> >>> >> > The same process with a smaller dataset completed successfully
> >>> >> >
> >>> >> > Any ideas to debug is welcome.
> >>> >> >
> >>> >> > Regards
> >>> >> > Eduardo
> >>> >> >
> >>> >> >
> >>> >
> >>> >
> >>
> >>
> >
>


Re: Handling Big data for interactive BI tools

2015-03-26 Thread Denny Lee
BTW, a tool that I have been using to help do the preaggregation of data
using hyperloglog in combination with Spark is atscale (http://atscale.com/).
It builds the aggregations and makes use of the speed of SparkSQL - all
within the context of a model that is accessible by Tableau or Qlik.

On Thu, Mar 26, 2015 at 8:55 AM Jörn Franke  wrote:

> As I wrote previously - indexing is not your only choice, you can
> preaggregate data during load or depending on your needs you  need to think
> about other data structures, such as graphs, hyperloglog, bloom filters
> etc. (challenge to integrate in standard bi tools)
> Le 26 mars 2015 13:34, "kundan kumar"  a écrit :
>
> I was looking for some options and came across JethroData.
>>
>> http://www.jethrodata.com/
>>
>> This stores the data maintaining indexes over all the columns seems good
>> and claims to have better performance than Impala.
>>
>> Earlier I had tried Apache Phoenix because of its secondary indexing
>> feature. But the major challenge I faced there was, secondary indexing was
>> not supported for bulk loading process.
>> Only the sequential loading process supported the secondary indexes,
>> which took longer time.
>>
>>
>> Any comments on this ?
>>
>>
>>
>>
>> On Thu, Mar 26, 2015 at 5:59 PM, kundan kumar 
>> wrote:
>>
>>> I looking for some options and came across
>>>
>>> http://www.jethrodata.com/
>>>
>>> On Thu, Mar 26, 2015 at 5:47 PM, Jörn Franke 
>>> wrote:
>>>
 You can also preaggregate results for the queries by the user -
 depending on what queries they use this might be necessary for any
 underlying technology
 Le 26 mars 2015 11:27, "kundan kumar"  a écrit :

 Hi,
>
> I need to store terabytes of data which will be used for BI tools like
> qlikview.
>
> The queries can be on the basis of filter on any column.
>
> Currently, we are using redshift for this purpose.
>
> I am trying to explore things other than the redshift .
>
> Is it possible to gain better performance in spark as compared to
> redshift ?
>
> If yes, please suggest what is the best way to achieve this.
>
>
> Thanks!!
> Kundan
>

>>>
>>


Re: python : Out of memory: Kill process

2015-03-26 Thread Davies Liu
Could you try to remove the line `log2.cache()` ?

On Thu, Mar 26, 2015 at 10:02 AM, Eduardo Cusa
 wrote:
> I running on ec2 :
>
> 1 Master : 4 CPU 15 GB RAM  (2 GB swap)
>
> 2 Slaves  4 CPU 15 GB RAM
>
>
> the uncompressed dataset size is 15 GB
>
>
>
>
> On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa
>  wrote:
>>
>> Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory.
>>
>> I ran the same code as before, I need to make any changes?
>>
>>
>>
>>
>>
>>
>> On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu  wrote:
>>>
>>> With batchSize = 1, I think it will become even worse.
>>>
>>> I'd suggest to go with 1.3, have a taste for the new DataFrame API.
>>>
>>> On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa
>>>  wrote:
>>> > Hi Davies, I running 1.1.0.
>>> >
>>> > Now I'm following this thread that recommend use batchsize parameter =
>>> > 1
>>> >
>>> >
>>> >
>>> > http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html
>>> >
>>> > if this does not work I will install  1.2.1 or  1.3
>>> >
>>> > Regards
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu 
>>> > wrote:
>>> >>
>>> >> What's the version of Spark you are running?
>>> >>
>>> >> There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3,
>>> >>
>>> >> [1] https://issues.apache.org/jira/browse/SPARK-6055
>>> >>
>>> >> On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa
>>> >>  wrote:
>>> >> > Hi Guys, I running the following function with spark-submmit and de
>>> >> > SO
>>> >> > is
>>> >> > killing my process :
>>> >> >
>>> >> >
>>> >> >   def getRdd(self,date,provider):
>>> >> > path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz'
>>> >> > log2= self.sqlContext.jsonFile(path)
>>> >> > log2.registerTempTable('log_test')
>>> >> > log2.cache()
>>> >>
>>> >> You only visit the table once, cache does not help here.
>>> >>
>>> >> > out=self.sqlContext.sql("SELECT user, tax from log_test where
>>> >> > provider =
>>> >> > '"+provider+"'and country <> ''").map(lambda row: (row.user,
>>> >> > row.tax))
>>> >> > print "out1"
>>> >> > return  map((lambda (x,y): (x, list(y))),
>>> >> > sorted(out.groupByKey(2000).collect()))
>>> >>
>>> >> 100 partitions (or less) will be enough for 2G dataset.
>>> >>
>>> >> >
>>> >> >
>>> >> > The input dataset has 57 zip files (2 GB)
>>> >> >
>>> >> > The same process with a smaller dataset completed successfully
>>> >> >
>>> >> > Any ideas to debug is welcome.
>>> >> >
>>> >> > Regards
>>> >> > Eduardo
>>> >> >
>>> >> >
>>> >
>>> >
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Combining Many RDDs

2015-03-26 Thread Noorul Islam K M
sparkx  writes:

> Hi,
>
> I have a Spark job and a dataset of 0.5 Million items. Each item performs
> some sort of computation (joining a shared external dataset, if that does
> matter) and produces an RDD containing 20-500 result items. Now I would like
> to combine all these RDDs and perform a next job. What I have found out is
> that the computation itself is quite fast, but combining these RDDs takes
> much longer time.
>
> val result = data// 0.5M data items
>   .map(compute(_))   // Produces an RDD - fast
>   .reduce(_ ++ _)  // Combining RDDs - slow
>
> I have also tried to collect results from compute(_) and use a flatMap, but
> that is also slow.
>
> Is there a way to efficiently do this? I'm thinking about writing this
> result to HDFS and reading from disk for the next job, but am not sure if
> that's a preferred way in Spark.
>

Are you looking for SparkContext.union() [1] ?

This is not performing well with spark cassandra connector. I am not
sure whether this will help you.

Thanks and Regards
Noorul

[1] 
http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: python : Out of memory: Kill process

2015-03-26 Thread Eduardo Cusa
I running on ec2 :

1 Master : 4 CPU 15 GB RAM  (2 GB swap)

2 Slaves  4 CPU 15 GB RAM


the uncompressed dataset size is 15 GB




On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa <
eduardo.c...@usmediaconsulting.com> wrote:

> Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory.
>
> I ran the same code as before, I need to make any changes?
>
>
>
>
>
>
> On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu  wrote:
>
>> With batchSize = 1, I think it will become even worse.
>>
>> I'd suggest to go with 1.3, have a taste for the new DataFrame API.
>>
>> On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa
>>  wrote:
>> > Hi Davies, I running 1.1.0.
>> >
>> > Now I'm following this thread that recommend use batchsize parameter = 1
>> >
>> >
>> >
>> http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html
>> >
>> > if this does not work I will install  1.2.1 or  1.3
>> >
>> > Regards
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu 
>> wrote:
>> >>
>> >> What's the version of Spark you are running?
>> >>
>> >> There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3,
>> >>
>> >> [1] https://issues.apache.org/jira/browse/SPARK-6055
>> >>
>> >> On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa
>> >>  wrote:
>> >> > Hi Guys, I running the following function with spark-submmit and de
>> SO
>> >> > is
>> >> > killing my process :
>> >> >
>> >> >
>> >> >   def getRdd(self,date,provider):
>> >> > path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz'
>> >> > log2= self.sqlContext.jsonFile(path)
>> >> > log2.registerTempTable('log_test')
>> >> > log2.cache()
>> >>
>> >> You only visit the table once, cache does not help here.
>> >>
>> >> > out=self.sqlContext.sql("SELECT user, tax from log_test where
>> >> > provider =
>> >> > '"+provider+"'and country <> ''").map(lambda row: (row.user,
>> row.tax))
>> >> > print "out1"
>> >> > return  map((lambda (x,y): (x, list(y))),
>> >> > sorted(out.groupByKey(2000).collect()))
>> >>
>> >> 100 partitions (or less) will be enough for 2G dataset.
>> >>
>> >> >
>> >> >
>> >> > The input dataset has 57 zip files (2 GB)
>> >> >
>> >> > The same process with a smaller dataset completed successfully
>> >> >
>> >> > Any ideas to debug is welcome.
>> >> >
>> >> > Regards
>> >> > Eduardo
>> >> >
>> >> >
>> >
>> >
>>
>
>


Re: What his the ideal method to interact with Spark Cluster from a Cloud App?

2015-03-26 Thread Noorul Islam K M

Today I found one answer from a this thread [1] which seems to be worth
exploring.

Michael, if you are reading this, it will be helpful if you could share
more about your spark deployment in production.

Thanks and Regards
Noorul

[1] 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-run-your-spark-app-tp7935p7958.html


Noorul Islam K M  writes:

> Hi all,
>
> We have a cloud application, to which we are adding a reporting service.
> For this we have narrowed down to use Cassandra + Spark for data store
> and processing respectively.
>
> Since cloud application is separate from Cassandra + Spark deployment,
> what is ideal method to interact with Spark Master from the application?
>
> We have been evaluating spark-job-server [1], which is an RESTful layer
> on top of Spark.
>
> Are there any other such tools? Or are there any other better approach
> which can be explored?
>
> We are evaluating following requirements against spark-job-server,
>
>1. Provide a platform for applications to submit jobs
>2. Provide RESTful APIs using which applications will interact with the 
>server
>   - Upload jar for running jobs
>   - Submit job
>   - Get job list
>   - Get job status
>   - Get job result
>3. Provide support for kill/restart job
>   - Kill job
>   - Restart job
>4. Support job priority
>5. Queue up job submissions if resources not available
>6. Troubleshoot job execution
>   - Failure – job logs
>   - Measure performance
>7. Manage cluster deployment
>   - Bootstrap, scale up/down (add, remove, replace nodes)
>8. Monitor cluster deployment
>   - Health report: Report metrics – CPU, Memory, - of jobs, spark 
>   processes
>   - Alert DevOps about threshold limit of these metrics
>   - Alert DevOps about job failures
>   - Self healing?
>9. Security
>   - AAA job submissions
>10. High availability/Redundancy
>   - This is for the spark-jobserver component itself
>
> Any help is appreciated!
>
> Thanks and Regards
> Noorul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HQL function Rollup and Cube

2015-03-26 Thread Chang Lim
Clarification on how the HQL was invoked:

  hiveContext.sql("select a, b, count(*) from t group by a, b with rollup")

Thanks,
Chang



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HQL-function-Rollup-and-Cube-tp22241p22244.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Combining Many RDDs

2015-03-26 Thread sparkx
Hi,

I have a Spark job and a dataset of 0.5 Million items. Each item performs
some sort of computation (joining a shared external dataset, if that does
matter) and produces an RDD containing 20-500 result items. Now I would like
to combine all these RDDs and perform a next job. What I have found out is
that the computation itself is quite fast, but combining these RDDs takes
much longer time.

val result = data// 0.5M data items
  .map(compute(_))   // Produces an RDD - fast
  .reduce(_ ++ _)  // Combining RDDs - slow

I have also tried to collect results from compute(_) and use a flatMap, but
that is also slow.

Is there a way to efficiently do this? I'm thinking about writing this
result to HDFS and reading from disk for the next job, but am not sure if
that's a preferred way in Spark.

Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Combining-Many-RDDs-tp22243.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get rdd count() without double evaluation of the RDD?

2015-03-26 Thread Mark Hamstra
You can also always take the more extreme approach of using
SparkContext#runJob (or submitJob) to write a custom Action that does what
you want in one pass.  Usually that's not worth the extra effort.

On Thu, Mar 26, 2015 at 9:27 AM, Sean Owen  wrote:

> To avoid computing twice you need to persist the RDD but that need not be
> in memory. You can persist to disk with persist().
> On Mar 26, 2015 4:11 PM, "Wang, Ningjun (LNG-NPV)" <
> ningjun.w...@lexisnexis.com> wrote:
>
>>  I have a rdd that is expensive to compute. I want to save it as object
>> file and also print the count. How can I avoid double computation of the
>> RDD?
>>
>>
>>
>> val rdd = sc.textFile(someFile).map(line => expensiveCalculation(line))
>>
>>
>>
>> val count = rdd.count()  // this force computation of the rdd
>>
>> println(count)
>>
>> rdd.saveAsObjectFile(file2) // this compute the RDD again
>>
>>
>>
>> I can avoid double computation by using cache
>>
>>
>>
>> val rdd = sc.textFile(someFile).map(line => expensiveCalculation(line))
>>
>> rdd.cache()
>>
>> val count = rdd.count()
>>
>> println(count)
>>
>> rdd.saveAsObjectFile(file2) // this compute the RDD again
>>
>>
>>
>> This only compute rdd once. However the rdd has millions of items and
>> will cause out of memory.
>>
>>
>>
>> Question: how can I avoid double computation without using cache?
>>
>>
>>
>>
>>
>> Ningjun
>>
>


Re: How to get rdd count() without double evaluation of the RDD?

2015-03-26 Thread Sean Owen
To avoid computing twice you need to persist the RDD but that need not be
in memory. You can persist to disk with persist().
On Mar 26, 2015 4:11 PM, "Wang, Ningjun (LNG-NPV)" <
ningjun.w...@lexisnexis.com> wrote:

>  I have a rdd that is expensive to compute. I want to save it as object
> file and also print the count. How can I avoid double computation of the
> RDD?
>
>
>
> val rdd = sc.textFile(someFile).map(line => expensiveCalculation(line))
>
>
>
> val count = rdd.count()  // this force computation of the rdd
>
> println(count)
>
> rdd.saveAsObjectFile(file2) // this compute the RDD again
>
>
>
> I can avoid double computation by using cache
>
>
>
> val rdd = sc.textFile(someFile).map(line => expensiveCalculation(line))
>
> rdd.cache()
>
> val count = rdd.count()
>
> println(count)
>
> rdd.saveAsObjectFile(file2) // this compute the RDD again
>
>
>
> This only compute rdd once. However the rdd has millions of items and will
> cause out of memory.
>
>
>
> Question: how can I avoid double computation without using cache?
>
>
>
>
>
> Ningjun
>


DataFrame GroupBy

2015-03-26 Thread gtanguy
Hello everybody,

I am trying to do a simple groupBy : 

*Code:*
val df  = hiveContext.sql("SELECT * FROM table1")
df .printSchema()
df .groupBy("customer_id").count().show(5)

*Stacktrace* :
root
 |-- customer_id: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- reco_material_id: string (nullable = true)
 |-- score: string (nullable = true)
 |-- category: string (nullable = true)
 |-- is_achat: string (nullable = true)

15/03/26 17:19:29 INFO HiveMetaStore: 0: get_table : db=default tbl=table1
15/03/26 17:19:29 INFO audit: ugi=spark ip=unknown-ip-addr  cmd=get_table :
db=default tbl=table1   
Exception in thread "main" java.util.NoSuchElementException: key not found:
customer_id#0
at scala.collection.MapLike$class.default(MapLike.scala:228)
at
org.apache.spark.sql.catalyst.expressions.AttributeMap.default(AttributeMap.scala:29)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at
org.apache.spark.sql.catalyst.expressions.AttributeMap.apply(AttributeMap.scala:29)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.hive.execution.HiveTableScan.(HiveTableScan.scala:53)
at
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$15.apply(HiveStrategies.scala:216)
at
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$15.apply(HiveStrategies.scala:216)
at
org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:1034)
at
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:212)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:152)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:290)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:1081)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:1079)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:1085)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:1085)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:758)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:809)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:314)


Does anyone have an idea?

Regards,

Germain Tanguy.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-GroupBy-tp22242.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



HQL function Rollup and Cube

2015-03-26 Thread Chang Lim
Has anyone been able to use Hive 0.13 ROLLUP and CUBE functions in Spark
1.3's Hive Context?  According to
https://issues.apache.org/jira/browse/SPARK-2663, this has been resolved in
Spark 1.3.

I created an in-memory temp table (t) and tried to execute a ROLLUP(and
CUBE) function:

 select a, b, count(*) from t group by a, b with rollup

Got the error that "with rollup" is an invalid function.  Am I missing
something?

Thanks,
Chang



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HQL-function-Rollup-and-Cube-tp22241.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-core and guava

2015-03-26 Thread Stevo Slavić
Thanks for heads up Sean!
On Mar 26, 2015 1:30 PM, "Sean Owen"  wrote:

> This is a long and complicated story. In short, Spark shades Guava 14
> except for a few classes that were accidentally used in a public API
> (Optional and a few more it depends on). So "provided" is more of a
> Maven workaround to achieve a desired effect. It's not "provided" in
> the usual sense.
>
> On Thu, Mar 26, 2015 at 12:24 PM, Stevo Slavić  wrote:
> > Hello Apache Spark community,
> >
> > spark-core 1.3.0 has guava 14.0.1 as provided dependency (see
> >
> http://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.3.0/spark-core_2.10-1.3.0.pom
> > )
> >
> > What is supposed to provide guava, and that specific version?
> >
> > Kind regards,
> > Stevo Slavic.
>


How to get rdd count() without double evaluation of the RDD?

2015-03-26 Thread Wang, Ningjun (LNG-NPV)
I have a rdd that is expensive to compute. I want to save it as object file and 
also print the count. How can I avoid double computation of the RDD?

val rdd = sc.textFile(someFile).map(line => expensiveCalculation(line))

val count = rdd.count()  // this force computation of the rdd
println(count)
rdd.saveAsObjectFile(file2) // this compute the RDD again

I can avoid double computation by using cache

val rdd = sc.textFile(someFile).map(line => expensiveCalculation(line))
rdd.cache()
val count = rdd.count()
println(count)
rdd.saveAsObjectFile(file2) // this compute the RDD again

This only compute rdd once. However the rdd has millions of items and will 
cause out of memory.

Question: how can I avoid double computation without using cache?


Ningjun


Parallel actions from driver

2015-03-26 Thread Aram Mkrtchyan
Hi.

I'm trying to trigger DataFrame's save method in parallel from my driver.
For that purposes I use ExecutorService and Futures, here's my code:


val futures = [1,2,3].map( t => pool.submit( new Runnable {

override def run(): Unit = {
val commons = events.filter(_._1 == t).map(_._2.common)
saveAsParquetFile(sqlContext, commons, s"$t/common")
EventTypes.all.foreach { et =>
val eventData = events.filter(ev => ev._1 == t && ev._2.eventType
== et).map(_._2.data)
saveAsParquetFile(sqlContext, eventData, s"$t/$et")
}
}

}))
futures.foreach(_.get)

It throws "Task is not Serializable" exception. Is it legal to use threads
in driver to trigger actions?


Re: Handling Big data for interactive BI tools

2015-03-26 Thread Jörn Franke
As I wrote previously - indexing is not your only choice, you can
preaggregate data during load or depending on your needs you  need to think
about other data structures, such as graphs, hyperloglog, bloom filters
etc. (challenge to integrate in standard bi tools)
Le 26 mars 2015 13:34, "kundan kumar"  a écrit :

> I was looking for some options and came across JethroData.
>
> http://www.jethrodata.com/
>
> This stores the data maintaining indexes over all the columns seems good
> and claims to have better performance than Impala.
>
> Earlier I had tried Apache Phoenix because of its secondary indexing
> feature. But the major challenge I faced there was, secondary indexing was
> not supported for bulk loading process.
> Only the sequential loading process supported the secondary indexes, which
> took longer time.
>
>
> Any comments on this ?
>
>
>
>
> On Thu, Mar 26, 2015 at 5:59 PM, kundan kumar 
> wrote:
>
>> I looking for some options and came across
>>
>> http://www.jethrodata.com/
>>
>> On Thu, Mar 26, 2015 at 5:47 PM, Jörn Franke 
>> wrote:
>>
>>> You can also preaggregate results for the queries by the user -
>>> depending on what queries they use this might be necessary for any
>>> underlying technology
>>> Le 26 mars 2015 11:27, "kundan kumar"  a écrit :
>>>
>>> Hi,

 I need to store terabytes of data which will be used for BI tools like
 qlikview.

 The queries can be on the basis of filter on any column.

 Currently, we are using redshift for this purpose.

 I am trying to explore things other than the redshift .

 Is it possible to gain better performance in spark as compared to
 redshift ?

 If yes, please suggest what is the best way to achieve this.


 Thanks!!
 Kundan

>>>
>>
>


Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread David Holiday
hi Nick

Unfortunately the Accumulo docs are woefully inadequate, and in some places, 
flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or 
if there's some wrinke with spark-notebook in the mix that's messing everything 
up. I've been working with some people on stack overflow on this same issue 
(including one of the people from the spark-notebook team):

http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530

if you click the link you can see the entire thread of code, responses from 
notebook, etc. I'm going to try invoking the same techniques both from within a 
stand-alone scala problem and from the shell itself to see if I can get some 
traction. I'll report back when I have more data.

cheers (and thx!)



DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com


[cid:AE39C43E-3FF7-4C90-BCE4-9711C84C4CB8@cld.annailabs.com]
www.AnnaiSystems.com

On Mar 25, 2015, at 11:43 PM, Nick Pentreath 
mailto:nick.pentre...@gmail.com>> wrote:

From a quick look at this link - 
http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it seems 
you need to call some static methods on AccumuloInputFormat in order to set the 
auth, table, and range settings. Try setting these config options first and 
then call newAPIHadoopRDD?

On Thu, Mar 26, 2015 at 2:34 AM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:
hi Irfan,

thanks for getting back to me - i'll try the accumulo list to be sure. what is 
the normal use case for spark though? I'm surprised that hooking it into 
something as common and popular as accumulo isn't more of an every-day task.

DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com



www.AnnaiSystems.com

On Mar 25, 2015, at 5:27 PM, Irfan Ahmad 
mailto:ir...@cloudphysics.com>> wrote:

Hmmm this seems very accumulo-specific, doesn't it? Not sure how to help 
with that.


Irfan Ahmad
CTO | Co-Founder | CloudPhysics
Best of VMworld Finalist
Best Cloud Management Award
NetworkWorld 10 Startups to Watch
EMA Most Notable Vendor

On Tue, Mar 24, 2015 at 4:09 PM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:
hi all,

got a vagrant image with spark notebook, spark, accumulo, and hadoop all 
running. from notebook I can manually create a scanner and pull test data from 
a table I created using one of the accumulo examples:

val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new 
PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)

scanner.setRange(new Range("row_00", "row_10"))

for(entry: Entry[Key, Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

will give the first ten rows of table data. when I try to create the RDD thusly:

val rdd2 =
  sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
  )

I get an RDD returned to me that I can't do much with due to the following 
error:

java.io.IOException: Input info has not been set. at 
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
 at 
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
 at 
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at 
org.apache.spark.rdd.RDD.count(RDD.scala:927)

which totally makes sense in light of the fact that I haven't specified any 
parameters as to which table to connect with, what the auths are, etc.

so my question is: what do I need to do from here to get those first ten rows 
of table data into my RDD?



DAVID HOLIDAY
Software Engineer
760 607 3300 | Office
312 758 8385 | Mobile
dav...@annaisystems.com



www.AnnaiSystems.com

On Mar 19, 2015, at 11:25 AM, David Holiday 
mailto:dav...@annaisystems.com>> wrote:

kk - I'll put something together and get back to you with more :-)

DAVID 

Re: Hive Table not from from Spark SQL

2015-03-26 Thread ๏̯͡๏
Stack Trace:

15/03/26 08:25:42 INFO ql.Driver: OK
15/03/26 08:25:42 INFO log.PerfLogger: 
15/03/26 08:25:42 INFO log.PerfLogger: 
15/03/26 08:25:42 INFO log.PerfLogger: 
15/03/26 08:25:42 INFO metastore.HiveMetaStore: 0: get_tables: db=default
pat=.*
15/03/26 08:25:42 INFO HiveMetaStore.audit: ugi=dvasthimal
ip=unknown-ip-addr cmd=get_tables: db=default pat=.*
15/03/26 08:25:43 INFO parse.ParseDriver: Parsing command: insert overwrite
table sojsuccessevents2_spark select
guid,sessionKey,sessionStartDate,sojDataDate,seqNum,eventTimestamp,siteId,successEventType,sourceType,itemId,
shopCartId,b.transaction_Id as transactionId,offerId,b.bdr_id as
userId,priorPage1SeqNum,priorPage1PageId,exclWMSearchAttemptSeqNum,exclPriorSearchPageId,
exclPriorSearchSeqNum,exclPriorSearchCategory,exclPriorSearchL1,exclPriorSearchL2,currentImpressionId,sourceImpressionId,exclPriorSearchSqr,exclPriorSearchSort,
isDuplicate,b.bid_date as
transactionDate,auctionTypeCode,isBin,leafCategoryId,itemSiteId,b.qty_bid
as bidQuantity, b.bid_amt_unit_lstg_curncy * b.bid_exchng_rate as
 
bidAmtUsd,offerQuantity,offerAmountUsd,offerCreateDate,buyerSegment,buyerCountryId,sellerId,sellerCountryId,
sellerStdLevel,cssSellerLevel,a.experimentChannel from sojsuccessevents1 a
join dw_bid b  on a.itemId = b.item_id  and  a.transactionId =
 b.transaction_id  where b.auct_end_dt >= '2015-02-16' AND b.bid_dt >=
'2015-02-16'  AND b.bid_type_code IN (1,9) AND b.bdr_id > 0 AND (
b.bid_flags & 32) = 0 and lower(a.successEventType) IN ('bid','bin')
15/03/26 08:25:43 INFO parse.ParseDriver: Parse Completed
15/03/26 08:25:43 INFO metastore.HiveMetaStore: 0: get_table : db=default
tbl=sojsuccessevents2_spark
15/03/26 08:25:43 INFO HiveMetaStore.audit: ugi=dvasthimal
ip=unknown-ip-addr cmd=get_table : db=default tbl=sojsuccessevents2_spark
15/03/26 08:25:44 INFO metastore.HiveMetaStore: 0: get_table : db=default
tbl=dw_bid
15/03/26 08:25:44 INFO HiveMetaStore.audit: ugi=dvasthimal
ip=unknown-ip-addr cmd=get_table : db=default tbl=dw_bid
15/03/26 08:25:44 ERROR metadata.Hive:
NoSuchObjectException(message:default.dw_bid table not found)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
at com.sun.proxy.$Proxy31.get_table(Unknown Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at com.sun.proxy.$Proxy32.getTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
at
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:180)
at org.apache.spark.sql.hive.HiveContext$$anon$1.org
$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:252)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:161)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:161)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:161)
at
org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:252)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:175)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:187)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:182)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:186)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:207)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plu

  1   2   >