Re: Re: Spark streaming doesn't print output when working with standalone master

2015-02-20 Thread Akhil Das
local[3] spawns 3 threads on 1 core :)

Thanks
Best Regards

On Fri, Feb 20, 2015 at 12:50 PM, bit1...@163.com bit1...@163.com wrote:

 Thanks Akhil, you are right.
 I checked and find that I have only 1 core allocated to the program
 I am running on a visual machine,and only allocate one processor to it(1
 core per processor), so even if I have specified --total-executor-cores 3
 in the submit script, the application will still only be allocated one
 processor.

 This leads to me another question:
 Although I have only one core, If I have specified the master and executor
 as  --master local[3] --executor-memory 512M --total-executor-cores 3. Since
 I have only one core, why does this work?

 --
 bit1...@163.com


 *From:* Akhil Das ak...@sigmoidanalytics.com
 *Date:* 2015-02-20 15:13
 *To:* bit1...@163.com
 *CC:* user user@spark.apache.org
 *Subject:* Re: Spark streaming doesn't print output when working with
 standalone master
 While running the program go to your clusters webUI (that runs on 8080,
 prolly at hadoop.master:8080) and see how many cores are allocated to the
 program, it should be = 2 for the stream to get processed.


 [image: Inline image 1]



 Thanks
 Best Regards

 On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote:

 Hi,
 I am trying the spark streaming log analysis reference application
 provided by Databricks at
 https://github.com/databricks/reference-apps/tree/master/logs_analyzer
 When I deploy the code to the standalone cluster, there is no output at
 will with the following shell script.Which means, the windowDStream has 0
 RDDs
 ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master
 spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3
 --class
 spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming
 LogApp.jar

 But, when I change --master to be --master local[3], the program starts
 to work fine. Can anyone have some advice? Thanks!
 ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master
 local[3] --executor-memory 512M --total-executor-cores 3 --class
 spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
 LogApp.jar


 object LogAnalyzerStreaming {

 val WINDOW_LENGTH = new Duration(12 * 1000)
 val SLIDE_INTERVAL = new Duration(6 * 1000)

 def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in
 Scala)
 val sc = new SparkContext(sparkConf)
 val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)

 val logLinesDStream = streamingContext.socketTextStream(localhost,
 )

 val accessLogsDStream =
 logLinesDStream.map(ApacheAccessLog.parseLogLine).cache()
 val windowDStream = accessLogsDStream.window(WINDOW_LENGTH,
 SLIDE_INTERVAL)

 windowDStream.foreachRDD(accessLogs = {
 if (accessLogs.count() == 0) {
 println(No access com.databricks.app.logs received in this time
 interval)
 } else {
 // Calculate statistics based on the content size.
 val contentSizes = accessLogs.map(log = log.contentSize).cache()
 println(Content Size Avg: %s, Min: %s, Max: %s.format(
 contentSizes.reduce(_ + _) / contentSizes.count,
 contentSizes.min,
 contentSizes.max
 ))

 streamingContext.start()
 streamingContext.awaitTermination()
 }
 }
 --


  [image: 提示图标] 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
 共有 *1* 个附件
  image.png(13K) 极速下载
 http://preview.mail.163.com/xdownload?filename=image.pngmid=xtbBERqmgVD%2Ba54h8AAAsmpart=3sign=6ae34eba8ee23b742de1031cde09ee34time=1424414893uid=bit1129%40163.com
 在线预览
 http://preview.mail.163.com/preview?mid=xtbBERqmgVD%2Ba54h8AAAsmpart=3sign=6ae34eba8ee23b742de1031cde09ee34time=1424414893uid=bit1129%40163.com




Regarding shuffle data file format

2015-02-20 Thread twinkle sachdeva
Hi,

What is the file format which is used to write files while shuffle write?
Is it dependent on the spark shuffle manager or output format?

Is it possible to change the file format for shuffle, irrespective of the
output format of the file?

Thanks,
Twinkle


Re: RDD Partition number

2015-02-20 Thread Alessandro Lulli
Hi All,

Thanks for your answers.
I have one more details to point out.

It is clear now how partition number is defined for HDFS file,

However, if i have my dataset replicated on all the machines in the same
absolute path.
In this case each machine has for instance ext3 filesystem.

If i load the file in a RDD how many partitions are defined in this case
and why?

I found that Spark define a number, say K, of partitions. If i force the
partition to be =K my parameter is ignored.
If a set a value K*=K then Spark set K* partitions.

Thanks for your help
Alessandro


On Thu, Feb 19, 2015 at 6:27 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. *blocks being 64MB by default in HDFS*


 *In hadoop 2.1+, default block size has been increased.*
 See https://issues.apache.org/jira/browse/HDFS-4053

 Cheers

 On Thu, Feb 19, 2015 at 8:32 AM, Ted Yu yuzhih...@gmail.com wrote:

 What file system are you using ?

 If you use hdfs, the documentation you cited is pretty clear on how
 partitions are determined.

 bq. file X replicated on 4 machines

 I don't think replication factor plays a role w.r.t. partitions.

 On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli lu...@di.unipi.it
 wrote:

 Hi All,

 Could you please help me understanding how Spark defines the number of
 partitions of the RDDs if not specified?

 I found the following in the documentation for file loaded from HDFS:
 *The textFile method also takes an optional second argument for
 controlling the number of partitions of the file. By default, Spark creates
 one partition for each block of the file (blocks being 64MB by default in
 HDFS), but you can also ask for a higher number of partitions by passing a
 larger value. Note that you cannot have fewer partitions than blocks*

 What is the rule for file loaded from the file systems?
 For instance, i have a file X replicated on 4 machines. If i load the
 file X in a RDD how many partitions are defined and why?

 Thanks for your help on this
 Alessandro






Re: Spark on Mesos: Multiple Users with iPython Notebooks

2015-02-20 Thread Iulian Dragoș
On Thu, Feb 19, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote:

 I am running Spark on Mesos and it works quite well.  I have three
 users, all who setup iPython notebooks to instantiate a spark instance
 to work with on the notebooks. I love it so far.

 Since I am auto instantiating (I don't want a user to have to
 think about instantiating and submitting a spark app to do adhoc
 analysis, I want the environment setup ahead of time) this is done
 whenever an iPython notebook is open.  So far it's working pretty
 good, save one issue:

 Every notebook is a new driver. I.e. every time they open a notebook,
 a new spark submit is called, and the driver resources are allocated,
 regardless if they are used or not.  Yes, it's only the driver, but
 even that I find starts slowing down my queries for the notebooks that
 using spark.  (I am running in Mesos Fined Grained mode).


 I have three users on my system, ideally, I would love to find a way
 so that on the first notebook being opened, a driver is started for
 that user, and then can be used for any notebook the user has open. So
 if they open a new notebook, I can check that yes, the user has a
 spark driver running, and thus, that notebook, if there is a query,
 will run it through that driver. That allows me to understand the
 resource allocation better, and it limits users from running 10
 notebooks and having a lot of resources.

 The other thing I was wondering is could the driver actually be run on
 the mesos cluster? Right now, I have a edge node as an iPython
 server, the drivers all exist on that server, so as I get more and
 more drivers, the box's local resources get depleted with unused
 drivers.  Obviously if I could reuse the drivers per user, on that
 box, that is great first step, but if I could reuse drivers, and run
 them on the cluster, that would be ideal.  looking through the docs I
 was not clear on those options. If anyone could point me in the right
 direction, I would greatly appreciate it!


Cluster mode support for Spark is tracked under [SPARK-5338](
https://issues.apache.org/jira/browse/SPARK-5338). I know Tim Chen is
working on it, so there will be progress soon.

iulian



 John

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Spark Performance on Yarn

2015-02-20 Thread Sean Owen
None of this really points to the problem. These indicate that workers
died but not why. I'd first go locate executor logs that reveal more
about what's happening. It sounds like a hard-er type of failure, like
JVM crash or running out of file handles, or GC thrashing.

On Fri, Feb 20, 2015 at 4:51 AM, lbierman leebier...@gmail.com wrote:
 I'm a bit new to Spark, but had a question on performance. I suspect a lot of
 my issue is due to tuning and parameters. I have a Hive external table on
 this data and to run queries against it runs in minutes

 The Job:
 + 40gb of avro events on HDFS (100 million+ avro events)
 + Read in the files from HDFS and dedupe events by key (mapToPair then a
 reduceByKey)
 + RDD returned and persisted (disk and memory)
 + Then passed to a job that take the RDD and mapToPair of new object data
 and then reduceByKey and foreachpartion do work

 The issue:
 When I run this on my environment on Yarn this takes 20+ hours. Running on
 yarn we see the first stage runs to do build the RDD deduped, but then when
 the next stage starts, things fail and data is lost. This results in stage 0
 starting over and over and just dragging it out.

 Errors I see in the driver logs:
 ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on X: remote
 Akka client disassociated

 15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1
 (TID 1335,): FetchFailed(BlockManagerId(3, i, 33958), shuffleId=1,
 mapId=162, reduceId=0, message=
 org.apache.spark.shuffle.FetchFailedException: Failed to connect
 toX/X:33958

 Also we see this, but I'm suspecting this is because the previous stage
 fails and the next one starts:
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 1

 Cluster:
 5 machines, each 2 core , 8gb machines

 Spark-submit command:
  spark-submit --class com.myco.SparkJob \
 --master yarn \
 /tmp/sparkjob.jar \

 Any thoughts or where to look or how to start approaching this problem or
 more data points to present.

 Thanks..

 Code for the job:
  JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
 context.newAPIHadoopRDD(
 context.hadoopConfiguration(),
 AvroKeyInputFormat.class,
 AvroKey.class,
 NullWritable.class
 ).keys())
 .map(event - AnalyticsEvent.newBuilder(event.datum()).build())
 .filter(key - { return
 Optional.ofNullable(key.getStepEventKey()).isPresent(); })
 .mapToPair(event - new Tuple2AnalyticsEvent, Integer(event, 1))
 .reduceByKey((analyticsEvent1, analyticsEvent2) - analyticsEvent1)
 .map(tuple - tuple._1());

 events.persist(StorageLevel.MEMORY_AND_DISK_2());
 events.mapToPair(event - {
 return new Tuple2T, RunningAggregates(
 keySelector.select(event),
 new RunningAggregates(
 Optional.ofNullable(event.getVisitors()).orElse(0L),
 Optional.ofNullable(event.getImpressions()).orElse(0L),
 Optional.ofNullable(event.getAmount()).orElse(0.0D),

 Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D)));
 })
 .reduceByKey((left, right) - { return left.add(right); })
 .foreachpartition(dostuff)






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Accumulator in SparkUI for streaming

2015-02-20 Thread Tim Smith
On Spark 1.2:

I am trying to capture # records read from a kafka topic:

val inRecords = ssc.sparkContext.accumulator(0, InRecords)

..

kInStreams.foreach( k =
{

 k.foreachRDD ( rdd =  inRecords += rdd.count().toInt  )
 inRecords.value


Question is how do I get the accumulator to show up in the UI? I tried
inRecords.value but that didn't help. Pretty sure it isn't showing up in
Stage metrics.

What's the trick here? collect?

Thanks,

Tim


DataFrame: Enable zipWithUniqueId

2015-02-20 Thread Dima Zhiyanov
Hello

Question regarding the new DataFrame API introduced here
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

I oftentimes use the zipWithUniqueId method of the SchemaRDD (as an RDD) to
replace string keys with more efficient long keys. Would it be possible to
use the same method in the new DataFrame class?

It looks like unlike the SchemaRdd DataFrame does not extend RDD

Thanks
Dima




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-Enable-zipWithUniqueId-tp21733.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming Linear Regression

2015-02-20 Thread Emre Sevinc
Hello Baris,

Giving your complete source code (if not very long, or maybe via
https://gist.github.com/) could be more helpful.

Also telling which Spark version you use, on which file system, and how you
run your application, together with the any log / output info it produces
might make collective debugging relatively easier.

--
Emre Sevinç
http://www.bigindustries.be/



On Thu, Feb 19, 2015 at 9:01 PM, barisak baris.akg...@gmail.com wrote:

 Hi

 I tried to run Streaming Linear Regression in my local.

 val trainingData =

 ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse)

 textFileStream is not seeing the new files. I search on the Internet, and I
 saw that somebody has same issue but no solution is found for that.

 Is there any opinion for this ? Is there any body who can achieve  the
 running streaming linear regression ?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Emre Sevinc


Re: using a database connection pool to write data into an RDBMS from a Spark application

2015-02-20 Thread Sean Owen
Although I don't know if it's related, the Class.forName() method of
loading drivers is very old. You should be using DataSource and
javax.sql; this has been the usual practice since about Java 1.4.

Why do you say a different driver is being loaded? that's not the error here.

Try instantiating the driver directly to test whether it's available
in the classpath. Otherwise you would have to check whether the jar
exists, the class exists in it, and it's really on your classpath.

On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com wrote:
 Hi Kelvin,



 Yes. I am creating an uber jar with the Postgres driver included, but
 nevertheless tried both –jars and –driver-classpath flags. It didn’t help.



 Interestingly, I can’t use BoneCP even in the driver program when I run my
 application with spark-submit. I am getting the same exception when the
 application initializes BoneCP before creating SparkContext. It looks like
 Spark is loading a different version of the Postgres JDBC driver than the
 one that I am linking.



 Mohammed



 From: Kelvin Chu [mailto:2dot7kel...@gmail.com]
 Sent: Thursday, February 19, 2015 7:56 PM
 To: Mohammed Guller
 Cc: user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an RDBMS
 from a Spark application



 Hi Mohammed,



 Did you use --jars to specify your jdbc driver when you submitted your job?
 Take a look of this link:
 http://spark.apache.org/docs/1.2.0/submitting-applications.html



 Hope this help!



 Kelvin



 On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com
 wrote:

 Hi –

 I am trying to use BoneCP (a database connection pooling library) to write
 data from my Spark application to an RDBMS. The database inserts are inside
 a foreachPartition code block. I am getting this exception when the code
 tries to insert data using BoneCP:



 java.sql.SQLException: No suitable driver found for
 jdbc:postgresql://hostname:5432/dbname



 I tried explicitly loading the Postgres driver on the worker nodes by adding
 the following line inside the foreachPartition code block:



 Class.forName(org.postgresql.Driver)



 It didn’t help.



 Has anybody able to get a database connection pool library to work with
 Spark? If you got it working, can you please share the steps?



 Thanks,

 Mohammed





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can you add Big Industries to the Powered by Spark page?

2015-02-20 Thread Emre Sevinc
Hello,

Could you please add Big Industries to the Powered by Spark page at
https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ?


Company Name: Big Industries

URL:  http://http://www.bigindustries.be/

Spark Components: Spark Streaming

Use Case: Big Content Platform

Summary: The Big Content Platform is a business-to-business content asset
management service providing a searchable, aggregated source of live news
feeds, public domain media and archives of content.

The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache
Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the
platform leverages public datasets like Freebase, DBpedia, Wiktionary, and
Geonames to support semantic text enrichment.



Kind regards,

Emre Sevinç
http://www.bigindustries.be/


Re: Re: Spark streaming doesn't print output when working with standalone master

2015-02-20 Thread bit1...@163.com
Thanks Akhil.




 
From: Akhil Das
Date: 2015-02-20 16:29
To: bit1...@163.com
CC: user
Subject: Re: Re: Spark streaming doesn't print output when working with 
standalone master
local[3] spawns 3 threads on 1 core :)

Thanks
Best Regards

On Fri, Feb 20, 2015 at 12:50 PM, bit1...@163.com bit1...@163.com wrote:
Thanks Akhil, you are right.
I checked and find that I have only 1 core allocated to the program
I am running on a visual machine,and only allocate one processor to it(1 core 
per processor), so even if I have specified --total-executor-cores 3 in the 
submit script, the application will still only be allocated one processor.

This leads to me another question:
Although I have only one core, If I have specified the master and executor as  
--master local[3] --executor-memory 512M --total-executor-cores 3. Since I have 
only one core, why does this work? 



bit1...@163.com
 
From: Akhil Das
Date: 2015-02-20 15:13
To: bit1...@163.com
CC: user
Subject: Re: Spark streaming doesn't print output when working with standalone 
master
While running the program go to your clusters webUI (that runs on 8080, prolly 
at hadoop.master:8080) and see how many cores are allocated to the program, it 
should be = 2 for the stream to get processed.






Thanks
Best Regards

On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote:
Hi,
I am trying the spark streaming log analysis reference application provided by 
Databricks at 
https://github.com/databricks/reference-apps/tree/master/logs_analyzer 
When I deploy the code to the standalone cluster, there is no output at will 
with the following shell script.Which means, the windowDStream has 0 RDDs
./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 
--class 
spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
LogApp.jar

But, when I change --master to be --master local[3], the program starts to work 
fine. Can anyone have some advice? Thanks!
./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master 
local[3] --executor-memory 512M --total-executor-cores 3 --class 
spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming 
LogApp.jar


object LogAnalyzerStreaming { 

val WINDOW_LENGTH = new Duration(12 * 1000) 
val SLIDE_INTERVAL = new Duration(6 * 1000) 

def main(args: Array[String]) { 
val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) 
val sc = new SparkContext(sparkConf) 
val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) 

val logLinesDStream = streamingContext.socketTextStream(localhost, ) 

val accessLogsDStream = 
logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() 
val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) 

windowDStream.foreachRDD(accessLogs = { 
if (accessLogs.count() == 0) { 
println(No access com.databricks.app.logs received in this time interval) 
} else { 
// Calculate statistics based on the content size. 
val contentSizes = accessLogs.map(log = log.contentSize).cache() 
println(Content Size Avg: %s, Min: %s, Max: %s.format( 
contentSizes.reduce(_ + _) / contentSizes.count, 
contentSizes.min, 
contentSizes.max 
)) 

streamingContext.start() 
streamingContext.awaitTermination() 
} 
}




邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
共有 1 个附件
image.png(13K) 极速下载 在线预览 

邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
共有 1 个附件
image(02-20-15-14-57).png(13K) 极速下载 在线预览 


Setting the number of executors in standalone mode

2015-02-20 Thread Yiannis Gkoufas
Hi there,

I try to increase the number of executors per worker in the standalone mode
and I have failed to achieve that.
I followed a bit the instructions of this thread:
http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores

and did that:
spark.executor.memory 1g
SPARK_WORKER_MEMORY=8g

hoping to get 8 executors per worker but its still 1.
And the option num-executors is not available in the standalone mode.

Thanks a lot!


Where to look for potential causes for Akka timeout errors in a Spark Streaming Application?

2015-02-20 Thread Emre Sevinc
Hello,

We are building a Spark Streaming application that listens to a directory
on HDFS, and uses the SolrJ library to send newly detected files to a Solr
server. When we put 10.000 files to the directory it is listening to, it
starts to process them by sending the files to our Solr server but after
about a few thousand files the Spark Streaming application dies.

Before the application dies, It gives some TimeoutException errors related
to Akka, such as:

  util.AkkaUtils: Error sending message in 1 attempts
  java.util.concurrent.TimeoutException: Futures timed out after [30
seconds]
  akka.pattern.AskTimeoutException: Timed out

Any ideas on how to deal with this? Should we add/change/tweak some Spark
configuration parameters?

We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and 2GB
of memory to that application when invoking it via spark-submit command.

Below you can read the last few lines of the log file, showing what our
Spark Streaming application logged just before it died:


15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split:
hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620
15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading
broadcast variable 3004
15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called
with curMem=31171148, maxMem=794647
15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0
stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB)
15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block
broadcast_3004_piece0
15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 3004 took 7897 ms
15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363) called
with curMem=31192144, maxMem=794647
15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored as
values in memory (estimated size 339.2 KB, free 1030.2 MB)
15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called
with curMem=31539507, maxMem=794647
15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as
bytes in memory (estimated size 2.6 KB, free 1030.2 MB)
15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block
rdd_3659_3
15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client,
config:maxConnections=128maxConnectionsPerHost=32followRedirects=false
15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called with
curMem=31542134, maxMem=794647
15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as
bytes in memory (estimated size 5.0 B, free 1030.2 MB)
15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block
rdd_3660_3
15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage 245.0
(TID 3455). 2516 bytes result sent to driver
15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] -
[akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated! Shutting
down.

LogType: stdout
LogLength: 0
Log Contents:



Container: container_1422006251277_0837_01_04 on node08.demo.hadoop_8041
==
LogType: stderr
LogLength: 2952
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/02/20 13:29:26 INFO executor.CoarseGrainedExecutorBackend: Registered
signal handlers for [TERM, HUP, INT]
15/02/20 13:29:27 INFO spark.SecurityManager: Changing view acls to:
yarn,bjorn
15/02/20 13:29:27 INFO spark.SecurityManager: Changing modify acls to:
yarn,bjorn
15/02/20 13:29:27 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(yarn, bjorn); users with modify permissions: 

GSOC2015

2015-02-20 Thread magellane a
Hi
Since we're approaching the GSOC2015 application process I have some
questions:

1) Will your organization be a part of GSOC2015 and what are the projects
that you will be interested in?
2) Since I'm not a contributor to apache spark, what are some starter tasks
I can work on to gain facility with the code base?
3) Am I posting this to the right mailing list or should I post to the
dev-mailing-list?


Thanks a lot

Regards,


Re: storing MatrixFactorizationModel (pyspark)

2015-02-20 Thread Antony Mayi
well, I understand the math (having two vectors) but the python 
MatrixFactorizationModel object seems to be just a wrapper around java class so 
not sure how to extract the two RDDs?thx,Antony. 

 On Thursday, 19 February 2015, 16:32, Ilya Ganelin ilgan...@gmail.com 
wrote:
   
 

 Yep. the matrix model had two RDD vectors representing the decomposed matrix. 
You can save these to disk and re use them. 
On Thu, Feb 19, 2015 at 2:19 AM Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
when getting the model out of ALS.train it would be beneficial to store it (to 
disk) so the model can be reused later for any following predictions. I am 
using pyspark and I had no luck pickling it either using standard pickle module 
or even dill.
does anyone have a solution for this (note it is pyspark)?
thank you,Antony.


 
   

Re: Why is RDD lookup slow?

2015-02-20 Thread shahab
Thanks you all. Just changing RDD to Map  structure saved me approx. 1
second.

Yes, I will check out IndexedRDD to see if it has better performance.

best,
/Shahab

On Thu, Feb 19, 2015 at 6:38 PM, Burak Yavuz brk...@gmail.com wrote:

 If your dataset is large, there is a Spark Package called IndexedRDD
 optimized for lookups. Feel free to check that out.

 Burak
 On Feb 19, 2015 7:37 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi Shahab - if your data structures are small enough a broadcasted Map is
 going to provide faster lookup. Lookup within an RDD is an O(m) operation
 where m is the size of the partition. For RDDs with multiple partitions,
 executors can operate on it in parallel so you get some improvement for
 larger RDDs.
 On Thu, Feb 19, 2015 at 7:31 AM shahab shahab.mok...@gmail.com wrote:

 Hi,

 I am doing lookup on cached RDDs [(Int,String)], and I noticed that the
 lookup is relatively slow 30-100 ms ?? I even tried this on one machine
 with single partition, but no difference!

 The RDDs are not large at all, 3-30 MB.

 Is this expected behaviour? should I use other data structures, like
 HashMap to keep data and look up it there and use Broadcast to send a copy
 to all machines?

 best,
 /Shahab





Re: Spark job fails on cluster but works fine on a single machine

2015-02-20 Thread Pavel Velikhov
I definitely delete the file on the right HDFS, I only have one HDFS instance.

The problem seems to be in the CassandraRDD - reading always fails in some way 
when run on the cluster, but single-machine reads are okay.



 On Feb 20, 2015, at 4:20 AM, Ilya Ganelin ilgan...@gmail.com wrote:
 
 The stupid question is whether you're deleting the file from hdfs on the 
 right node?
 On Thu, Feb 19, 2015 at 11:31 AM Pavel Velikhov pavel.velik...@gmail.com 
 mailto:pavel.velik...@gmail.com wrote:
 Yeah, I do manually delete the files, but it still fails with this error.
 
 On Feb 19, 2015, at 8:16 PM, Ganelin, Ilya ilya.gane...@capitalone.com 
 mailto:ilya.gane...@capitalone.com wrote:
 
 When writing to hdfs Spark will not overwrite existing files or directories. 
 You must either manually delete these or use Java's Hadoop FileSystem class 
 to remove them.
 
 
 
 Sent with Good (www.good.com http://www.good.com/)
 
 
 -Original Message-
 From: Pavel Velikhov [pavel.velik...@gmail.com 
 mailto:pavel.velik...@gmail.com]
 Sent: Thursday, February 19, 2015 11:32 AM Eastern Standard Time
 To: user@spark.apache.org mailto:user@spark.apache.org
 Subject: Spark job fails on cluster but works fine on a single machine
 
 I have a simple Spark job that goes out to Cassandra, runs a pipe and stores 
 results:
 
 val sc = new SparkContext(conf)
 val rdd = sc.cassandraTable(“keyspace, “table)
   .map(r = r.getInt(“column) + \t + 
 write(get_lemmas(r.getString(tags
   .pipe(python3 /tmp/scripts_and_models/scripts/run.py)
   .map(r = convertStr(r) )
   .coalesce(1,true)
   .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt)
   //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”))
 
 When run on a single machine, everything is fine if I save to an hdfs file 
 or save to Cassandra.
 When run in cluster neither works:
 
  - When saving to file, I get an exception: User class threw exception: 
 Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt  
 already exists
  - When saving to Cassandra, only 4 rows are updated with empty data (I test 
 on a 4-machine Spark cluster)
 
 Any hints on how to debug this and where the problem could be?
 
 - I delete the hdfs file before running
 - Would really like the output to hdfs to work, so I can debug
 - Then it would be nice to save to Cassandra
 
 The information contained in this e-mail is confidential and/or proprietary 
 to Capital One and/or its affiliates. The information transmitted herewith 
 is intended only for use by the individual or entity to which it is 
 addressed.  If the reader of this message is not the intended recipient, you 
 are hereby notified that any review, retransmission, dissemination, 
 distribution, copying or other use of, or taking of any action in reliance 
 upon this information is strictly prohibited. If you have received this 
 communication in error, please contact the sender and delete the material 
 from your computer.
 



Re: Streaming Linear Regression

2015-02-20 Thread Emre Sevinc
Baris,

I've tried the following piece of code:

https://gist.github.com/emres/10c509c1d69264fe6fdb

and built it using

sbt package

and then submitted it via

  spark-submit --class
org.apache.spark.examples.mllib.StreamingLinearRegression --master local[4]
target/scala-2.10/streaminglinearregression_2.10-1.0.jar

And once it started to run, I've waited for a few seconds, and then I've
copied a few files to

   /home/emre/data/train

And observed the log output on my console:

 15/02/20 13:08:35 INFO FileInputDStream: Finding new files took 29 ms
15/02/20 13:08:35 INFO FileInputDStream: New files at time 1424434115000 ms:
file:/home/emre/data/train/newsMessageNL14.json
file:/home/emre/data/train/newsMessageNL11.json
file:/home/emre/data/train/newsMessageNL10.json
file:/home/emre/data/train/newsMessageNL6.json
file:/home/emre/data/train/newsMessageNL8.json
file:/home/emre/data/train/newsMessageNL5.json
file:/home/emre/data/train/newsMessageNL1.json
file:/home/emre/data/train/newsMessageNL9.json
file:/home/emre/data/train/newsMessageNL2.json
file:/home/emre/data/train/newsMessageNL16.json
file:/home/emre/data/train/newsMessageNL20.json
file:/home/emre/data/train/newsMessageNL12.json
file:/home/emre/data/train/newsMessageNL4.json
file:/home/emre/data/train/newsMessageNL19.json
file:/home/emre/data/train/newsMessageNL7.json
file:/home/emre/data/train/newsMessageNL17.json
file:/home/emre/data/train/newsMessageNL18.json
file:/home/emre/data/train/newsMessageNL3.json
file:/home/emre/data/train/newsMessageNL13.json
file:/home/emre/data/train/newsMessageNL15.json
15/02/20 13:08:35 INFO MemoryStore: ensureFreeSpace(214074) called with
curMem=0, maxMem=278019440

The contents of JSON files of course don't make sense in this context
(building a linear regression model), I've used them only to check whether
the system detects new files, and as can be seen above, it does.

You can start from the source code I've shared, which is detecting new
files, and continue to build your particular streaming linear regression
application.

--
Emre Sevinç
http://www.bigindustries.be



On Thu, Feb 19, 2015 at 9:01 PM, barisak baris.akg...@gmail.com wrote:

 Hi

 I tried to run Streaming Linear Regression in my local.

 val trainingData =

 ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse)

 textFileStream is not seeing the new files. I search on the Internet, and I
 saw that somebody has same issue but no solution is found for that.

 Is there any opinion for this ? Is there any body who can achieve  the
 running streaming linear regression ?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Emre Sevinc


Re: Spark on Mesos: Multiple Users with iPython Notebooks

2015-02-20 Thread John Omernik
Awesome! This is exactly what I'd need.  Unfortunately, I am not a
programmer of any talent or skill, but how could I assist with this
JIRA? From a User perspective, this is really the next step for my org
taking our Mesos cluster to user land with Spark. I don't want to be
pushy, but is there any sort of time frame I could possibly
communicate to my team? Anything I can do?

Thanks!

On Fri, Feb 20, 2015 at 4:36 AM, Iulian Dragoș
iulian.dra...@typesafe.com wrote:


 On Thu, Feb 19, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote:

 I am running Spark on Mesos and it works quite well.  I have three
 users, all who setup iPython notebooks to instantiate a spark instance
 to work with on the notebooks. I love it so far.

 Since I am auto instantiating (I don't want a user to have to
 think about instantiating and submitting a spark app to do adhoc
 analysis, I want the environment setup ahead of time) this is done
 whenever an iPython notebook is open.  So far it's working pretty
 good, save one issue:

 Every notebook is a new driver. I.e. every time they open a notebook,
 a new spark submit is called, and the driver resources are allocated,
 regardless if they are used or not.  Yes, it's only the driver, but
 even that I find starts slowing down my queries for the notebooks that
 using spark.  (I am running in Mesos Fined Grained mode).


 I have three users on my system, ideally, I would love to find a way
 so that on the first notebook being opened, a driver is started for
 that user, and then can be used for any notebook the user has open. So
 if they open a new notebook, I can check that yes, the user has a
 spark driver running, and thus, that notebook, if there is a query,
 will run it through that driver. That allows me to understand the
 resource allocation better, and it limits users from running 10
 notebooks and having a lot of resources.

 The other thing I was wondering is could the driver actually be run on
 the mesos cluster? Right now, I have a edge node as an iPython
 server, the drivers all exist on that server, so as I get more and
 more drivers, the box's local resources get depleted with unused
 drivers.  Obviously if I could reuse the drivers per user, on that
 box, that is great first step, but if I could reuse drivers, and run
 them on the cluster, that would be ideal.  looking through the docs I
 was not clear on those options. If anyone could point me in the right
 direction, I would greatly appreciate it!


 Cluster mode support for Spark is tracked under
 [SPARK-5338](https://issues.apache.org/jira/browse/SPARK-5338). I know Tim
 Chen is working on it, so there will be progress soon.

 iulian



 John

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 --
 Iulian Dragos

 --
 Reactive Apps on the JVM
 www.typesafe.com


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



what does Submitting ... missing tasks from Stage mean?

2015-02-20 Thread shahab
Hi,

Probably this is silly question, but I couldn't find any clear
documentation explaining why  one  should submitting... missing tasks from
Stage ... in the logs?

Specially in my case when I do not have any failure in job execution, I
wonder why this should happen?
Does it have any relation to lazy evaluation?

best,
/Shahab


Re: Where to look for potential causes for Akka timeout errors in a Spark Streaming Application?

2015-02-20 Thread Todd Nist
Hi Emre,

Have you tried adjusting these:

.set(spark.akka.frameSize, 500).set(spark.akka.askTimeout,
30).set(spark.core.connection.ack.wait.timeout, 600)

-Todd

On Fri, Feb 20, 2015 at 8:14 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 We are building a Spark Streaming application that listens to a directory
 on HDFS, and uses the SolrJ library to send newly detected files to a Solr
 server. When we put 10.000 files to the directory it is listening to, it
 starts to process them by sending the files to our Solr server but after
 about a few thousand files the Spark Streaming application dies.

 Before the application dies, It gives some TimeoutException errors related
 to Akka, such as:

   util.AkkaUtils: Error sending message in 1 attempts
   java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]
   akka.pattern.AskTimeoutException: Timed out

 Any ideas on how to deal with this? Should we add/change/tweak some Spark
 configuration parameters?

 We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and
 2GB of memory to that application when invoking it via spark-submit command.

 Below you can read the last few lines of the log file, showing what our
 Spark Streaming application logged just before it died:


 15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split:
 hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620
 15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading
 broadcast variable 3004
 15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called
 with curMem=31171148, maxMem=794647
 15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0
 stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB)
 15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_3004_piece0
 15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast
 variable 3004 took 7897 ms
 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363) called
 with curMem=31192144, maxMem=794647
 15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored as
 values in memory (estimated size 339.2 KB, free 1030.2 MB)
 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called
 with curMem=31539507, maxMem=794647
 15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as
 bytes in memory (estimated size 2.6 KB, free 1030.2 MB)
 15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block
 rdd_3659_3
 15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client,
 config:maxConnections=128maxConnectionsPerHost=32followRedirects=false
 15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called with
 curMem=31542134, maxMem=794647
 15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as
 bytes in memory (estimated size 5.0 B, free 1030.2 MB)
 15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block
 rdd_3660_3
 15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage 245.0
 (TID 3455). 2516 bytes result sent to driver
 15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
 15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] -
 [akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated! Shutting
 down.

 LogType: stdout
 LogLength: 0
 Log Contents:



 Container: container_1422006251277_0837_01_04 on
 node08.demo.hadoop_8041

 ==
 LogType: stderr
 LogLength: 2952
 Log Contents:
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 15/02/20 13:29:26 INFO executor.CoarseGrainedExecutorBackend: Registered
 signal handlers for [TERM, HUP, INT]
 15/02/20 13:29:27 INFO 

Re: Spark Streaming and message ordering

2015-02-20 Thread Cody Koeninger
For a given batch, for a given partition, the messages will be processed in
order by the executor that is running that partition.  That's because
messages for the given offset range are pulled by the executor, not pushed
from some other receiver.

If you have speculative execution, yes, another executor may be running
that partition.

If your job is lagging behind in processing such that the next batch starts
executing before the last batch is finished processing, yes it is possible
for some other executor to start working on messages from that same kafka
partition.

The obvious solution here seems to be turn off speculative execution and
adjust your batch interval / sizes such that they can comfortably finish
processing :)

If your processing time is sufficiently non-linear with regard to the
number of messages, yes you might be able to do something with overriding
dstream.compute.  Unfortunately the new kafka dstream implementation is
private, so it's not straightforward to subclass it.  I'd like to get a
solution in place for people who need to be able to tune the batch
generation policy (I need to as well, for unrelated reasons).  Maybe you
can say a little more about your use case.

But regardless of the technology you're using to read from kafka (spark,
storm, whatever), kafka only gives you ordering as to a particular
partition.  So you're going to need to do some kind of downstream sorting
if you really care about a global order.

On Fri, Feb 20, 2015 at 1:43 AM, Neelesh neele...@gmail.com wrote:

 Even with the new direct streams in 1.3,  isn't it the case that the job
 *scheduling* follows the partition order, rather than job *execution*? Or
 is it the case that the stream listens to job completion event (using a
 streamlistener) before scheduling the next batch?  To compare with storm
 from a message ordering point of view, unless a tuple is fully processed by
 the DAG (as defined by spout+bolts), the next tuple does not enter the DAG.


 On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Kafka ordering is guaranteed on a per-partition basis.

 The high-level consumer api as used by the spark kafka streams prior to
 1.3 will consume from multiple kafka partitions, thus not giving any
 ordering guarantees.

 The experimental direct stream in 1.3 uses the simple consumer api, and
 there is a 1:1 correspondence between spark partitions and kafka
 partitions.  So you will get deterministic ordering, but only on a
 per-partition basis.

 On Thu, Feb 19, 2015 at 11:31 PM, Neelesh neele...@gmail.com wrote:

 I had a chance to talk to TD today at the Strata+Hadoop Conf in San
 Jose. We talked a bit about this after his presentation about this - the
 short answer is spark streaming does not guarantee any sort of ordering
 (within batches, across batches).  One would have to use updateStateByKey
 to collect the events and sort them based on some attribute of the event.
 But TD said message ordering is a frequently asked feature recently and is
 getting on his radar.

 I went through the source code and there does not seem to be any
 architectural/design limitation to support this.  (JobScheduler,
 JobGenerator are a good starting point to see how stuff works under the
 hood).  Overriding DStream#compute and using streaminglistener looks like a
 simple way of ensuring ordered execution of batches within a stream. But
 this would be a partial solution, since ordering within a batch needs some
 more work that I don't understand fully yet.

 Side note :  My custom receiver polls the metricsservlet once in a while
 to decide whether jobs are getting done fast enough and throttle/relax
 pushing data in to receivers based on the numbers provided by
 metricsservlet. I had to do this because out-of-the-box rate limiting right
 now is static and cannot adapt to the state of the cluster

 thnx
 -neelesh

 On Wed, Feb 18, 2015 at 4:13 PM, jay vyas jayunit100.apa...@gmail.com
 wrote:

 This is a *fantastic* question.  The idea of how we identify individual
 things in multiple  DStreams is worth looking at.

 The reason being, that you can then fine tune your streaming job, based
 on the RDD identifiers (i.e. are the timestamps from the producer
 correlating closely to the order in which RDD elements are being produced)
 ?  If *NO* then you need to (1) dial up throughput on producer sources or
 else (2) increase cluster size so that spark is capable of evenly handling
 load.

 You cant decide to do (1) or (2) unless you can track  when the
 streaming elements are being  converted to RDDs by spark itself.



 On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote:

 There does not seem to be a definitive answer on this. Every time I
 google for message ordering,the only relevant thing that comes up is this
  -
 http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html
 .

 With a kafka receiver that pulls data from a single kafka partition of
 a kafka topic, 

How Spark and Flink are shaping the future of Hadoop?

2015-02-20 Thread Slim Baltagi
Hi

1.*To get a taste* of my talk at the 2015 Hadoop Summit, please find below a
few links to a similar talk that I gave at the Chicago Hadoop Users Group on
‘ *Transitioning Compute Models: Apache MapReduce to Spark*’ on February 12,
2015 in front of 185 attendees:

- Video Recording: http://goo.gl/f30eEn
- Slides: http://goo.gl/Ikx4Ud
- Blog Entry: http://goo.gl/Pc6qiz 

2. To *vote* for my proposal at the 2015 Hadoop Summit on '*How Spark and
Flink are shaping the future of Hadoop*?' Simply visit http://goo.gl/qfqSR9
, click on 'Vote', pick 3 votes and enter your name and email. Done in less
than 30 seconds! 

Thanks in advance for your help

Slim Baltagi
Sr. Big Data Architect
http://ww.SparkBigData.com



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-and-Flink-are-shaping-the-future-of-Hadoop-tp21743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: loads of memory still GC overhead limit exceeded

2015-02-20 Thread Xiangrui Meng
Hi Antony, Is it easy for you to try Spark 1.3.0 or master? The ALS
performance should be improved in 1.3.0. -Xiangrui

On Fri, Feb 20, 2015 at 1:32 PM, Antony Mayi
antonym...@yahoo.com.invalid wrote:
 Hi Ilya,

 thanks for your insight, this was the right clue. I had default parallelism
 already set but it was quite low (hundreds) and moreover the number of
 partitions of the input RDD was low as well so the chunks were really too
 big. Increased parallelism and repartitioning seems to be helping...

 Thanks!
 Antony.


 On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com
 wrote:



 Hi Anthony - you are seeing a problem that I ran into. The underlying issue
 is your default parallelism setting. What's happening is that within ALS
 certain RDD operations end up changing the number of partitions you have of
 your data. For example if you start with an RDD of 300 partitions, unless
 default parallelism is set while the algorithm executes you'll eventually
 get an RDD with something like 20 partitions. Consequently, your giant data
 set is now stored across a much smaller number of partitions so each
 partition is huge. Then, when a shuffle requires serialization you run out
 of heap space trying to serialize it. The solution should be as simple as
 setting the default parallelism setting.

 This is referenced in a JIRA I can't find at the moment.
 On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid
 wrote:

 now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again
 GC overhead limit exceeded:

 === spark stdout ===
 15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0
 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit
 exceeded
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 === yarn log (same) ===
 15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage
 18.0 (TID 5329)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 === yarn nodemanager ===
 2015-02-19 12:08:13,758 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19014 for container-id
 container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory
 used; 31.7 GB of 67.2 GB virtual memory used
 2015-02-19 12:08:13,778 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19013 for container-id
 container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory
 used; 103.6 MB of 67.2 GB virtual memory used
 2015-02-19 12:08:14,455 WARN
 org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
 code from container container_1424204221358_0013_01_08 is : 143
 2015-02-19 12:08:14,455 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
 Container container_1424204221358_0013_01_08 transitioned from RUNNING
 to EXITED_WITH_FAILURE
 2015-02-19 12:08:14,455 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0013_01_08

 Antony.




 On Thursday, 19 February 2015, 11:54, Antony Mayi
 antonym...@yahoo.com.INVALID wrote:



 it is from within the ALS.trainImplicit() call. btw. the exception varies
 between this GC overhead limit exceeded and Java heap space (which I
 guess is just different outcome of same problem).

 just tried another run and here are the logs (filtered) - note I tried this
 run with spark.shuffle.io.preferDirectBufs=false so this might be slightly
 different issue from my previous case (going to revert now):

 === spark stdout ===
 15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart
 beats: 50221ms exceeds 45000ms
 15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart
 

randomSplit instead of a huge map reduce ?

2015-02-20 Thread shlomib
Hi,

I am new to Spark and I think I missed something very basic.

I have the following use case (I use Java and run Spark locally on my
laptop):


I have a JavaRDDString[]

- The RDD contains around 72,000 arrays of strings (String[])

- Each array contains 80 words (on average).


What I want to do is to convert each array into a new array/list of pairs,
for example:

Input: String[] words = ['a', 'b', 'c']

Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')]

and then I want to count the number of times each pair appeared, so my final
output should be something like:

Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c',
8), (b', 'c', 10)]


The problem:

Since each array contains around 80 words, it returns around 3,200 pairs, so
after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs to
reduce which require way too much memory.

(I know I have only around *20,000,000* unique pairs!)

I already modified my code and used 'mapPartitions' instead of 'map'. It
definitely improved the performance, but I still feel I'm doing something
completely wrong.


I was wondering if this is the right 'Spark way' to solve this kind of
problem, or maybe I should do something like splitting my original RDD into
smaller parts (by using randomSplit), then iterate over each part, aggregate
the results into some result RDD (by using 'union') and move on to the next
part.


Can anyone please explain me which solution is better?


Thank you very much,

Shlomi.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: output worker stdout to one place

2015-02-20 Thread Anny Chen
Thanks Marcelo! I will try to change the log4j.properties

On Fri, Feb 20, 2015 at 11:37 AM, Marcelo Vanzin van...@cloudera.com
wrote:

 Hi Anny,

 You could play with creating your own log4j.properties that will write
 the output somewhere else (e.g. to some remote mount, or remote
 syslog). Sorry, but I don't have an example handy.

 Alternatively, if you can use Yarn, it will collect all logs after the
 job is finished and make them available as a single file using the
 yarn logs command.


 On Fri, Feb 20, 2015 at 11:31 AM, anny9699 anny9...@gmail.com wrote:
  Hi,
 
  I am wondering if there's some way that could lead some of the worker
 stdout
  to one place instead of in each worker's stdout. For example, I have the
  following code
 
  RDD.foreach{line =
  try{
  do something
  }catch{
  case e:exception = println(line)
  }
  }
 
  Every time I want to check what's causing the exception, I have to check
 one
  worker after another in the UI, because I don't know which worker will be
  dealing with the exception case. Is there a way that the println could
  print to one place instead of separate worker stdout so that I only need
 to
  check one place?
 
  Thanks a lot!
  Anny
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/output-worker-stdout-to-one-place-tp21742.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



 --
 Marcelo



Re: using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread Sourigna Phetsarath
Correction,

should be  HADOOP_CONF_DIR=/etc/hive/conf spark-shell --driver-class-path
'/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
--driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

On Fri, Feb 20, 2015 at 3:48 PM, Sourigna Phetsarath 
gna.phetsar...@teamaol.com wrote:

 Correction,

 should be  HADOOP_CONF_DIR=/etc/hive/conf --driver-class-path
 '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
 --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
 parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

 On Fri, Feb 20, 2015 at 3:43 PM, Sourigna Phetsarath 
 gna.phetsar...@teamaol.com wrote:

 Also, you might want to add the hadoop configs:

 HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf --driver-class-path
 '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
 --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
 parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

 Ok. It needs the CDH configs for hive and hadoop.  Hopefully this works
 for you.



 On Fri, Feb 20, 2015 at 3:41 PM, chirag lakhani chirag.lakh...@gmail.com
  wrote:

 Thanks!  I am able to login to Spark now but I am still getting the same
 error

 scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT
 *).collect().foreach(println)
 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM
 analytics.trainingdatafinal SELECT *
 15/02/20 14:40:22 INFO ParseDriver: Parse Completed
 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with
 implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called
 15/02/20 14:40:23 WARN General: Plugin (Bundle)
 org.datanucleus.api.jdo is already registered. Ensure you dont have
 multiple JAR versions of the same plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar.
 15/02/20 14:40:23 WARN General: Plugin (Bundle)
 org.datanucleus.store.rdbms is already registered. Ensure you dont have
 multiple JAR versions of the same plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar.
 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is
 already registered. Ensure you dont have multiple JAR versions of the same
 plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar.
 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2
 unknown - will be ignored
 15/02/20 14:40:23 INFO Persistence: Property
 hive.metastore.integral.jdo.pushdown unknown - will be ignored
 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes
 with
 hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming
 we are not on mysql: Lexical error at line 1, column 5.  Encountered: @
 (64), after : .
 15/02/20 14:40:27 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:27 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Query: Reading in results for query
 org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection
 used is closing
 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore
 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore
 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore
 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role,
 since config is empty
 15/02/20 14:40:29 INFO SessionState: No Tez session required at this
 point. hive.execution.engine=mr.
 15/02/20 14:40:29 INFO HiveMetaStore: 0: get_table : db=analytics
 tbl=trainingdatafinal

Re: using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread Sourigna Phetsarath
Correction,

should be  HADOOP_CONF_DIR=/etc/hive/conf --driver-class-path
'/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
--driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

On Fri, Feb 20, 2015 at 3:43 PM, Sourigna Phetsarath 
gna.phetsar...@teamaol.com wrote:

 Also, you might want to add the hadoop configs:

 HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf --driver-class-path
 '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
 --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
 parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

 Ok. It needs the CDH configs for hive and hadoop.  Hopefully this works
 for you.



 On Fri, Feb 20, 2015 at 3:41 PM, chirag lakhani chirag.lakh...@gmail.com
 wrote:

 Thanks!  I am able to login to Spark now but I am still getting the same
 error

 scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT
 *).collect().foreach(println)
 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM
 analytics.trainingdatafinal SELECT *
 15/02/20 14:40:22 INFO ParseDriver: Parse Completed
 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with
 implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called
 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.api.jdo
 is already registered. Ensure you dont have multiple JAR versions of the
 same plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar.
 15/02/20 14:40:23 WARN General: Plugin (Bundle)
 org.datanucleus.store.rdbms is already registered. Ensure you dont have
 multiple JAR versions of the same plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar.
 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is
 already registered. Ensure you dont have multiple JAR versions of the same
 plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar.
 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2
 unknown - will be ignored
 15/02/20 14:40:23 INFO Persistence: Property
 hive.metastore.integral.jdo.pushdown unknown - will be ignored
 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes
 with
 hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming
 we are not on mysql: Lexical error at line 1, column 5.  Encountered: @
 (64), after : .
 15/02/20 14:40:27 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:27 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Query: Reading in results for query
 org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used
 is closing
 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore
 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore
 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore
 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role,
 since config is empty
 15/02/20 14:40:29 INFO SessionState: No Tez session required at this
 point. hive.execution.engine=mr.
 15/02/20 14:40:29 INFO HiveMetaStore: 0: get_table : db=analytics
 tbl=trainingdatafinal
 15/02/20 14:40:29 INFO audit: ugi=hdfs ip=unknown-ip-addr cmd=get_table
 : db=analytics tbl=trainingdatafinal
 15/02/20 14:40:29 ERROR Hive:
 NoSuchObjectException(message:analytics.trainingdatafinal table not found)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 

Re: high GC in the Kmeans algorithm

2015-02-20 Thread Xiangrui Meng
A single vector of size 10^7 won't hit that bound. How many clusters
did you set? The broadcast variable size is 10^7 * k and you can
calculate the amount of memory it needs. Try to reduce the number of
tasks and see whether it helps. -Xiangrui

On Tue, Feb 17, 2015 at 7:20 PM, lihu lihu...@gmail.com wrote:
 Thanks for your answer. Yes, I cached the data, I can observed from the
 WebUI that all the data is cached in the memory.

 What I worry is that the dimension,  not the total size.

 Sean Owen ever answered me that the Broadcast support the maximum array size
 is 2GB, so 10^7 is a little huge?

 On Wed, Feb 18, 2015 at 5:43 AM, Xiangrui Meng men...@gmail.com wrote:

 Did you cache the data? Was it fully cached? The k-means
 implementation doesn't create many temporary objects. I guess you need
 more RAM to avoid GC triggered frequently. Please monitor the memory
 usage using YourKit or VisualVM. -Xiangrui

 On Wed, Feb 11, 2015 at 1:35 AM, lihu lihu...@gmail.com wrote:
  I just want to make the best use of CPU,  and test the performance of
  spark
  if there is a lot of task in a single node.
 
  On Wed, Feb 11, 2015 at 5:29 PM, Sean Owen so...@cloudera.com wrote:
 
  Good, worth double-checking that's what you got. That's barely 1GB per
  task though. Why run 48 if you have 24 cores?
 
  On Wed, Feb 11, 2015 at 9:03 AM, lihu lihu...@gmail.com wrote:
   I give 50GB to the executor,  so it seem that  there is no reason the
   memory
   is not enough.
  
   On Wed, Feb 11, 2015 at 4:50 PM, Sean Owen so...@cloudera.com
   wrote:
  
   Meaning, you have 128GB per machine but how much memory are you
   giving
   the executors?
  
   On Wed, Feb 11, 2015 at 8:49 AM, lihu lihu...@gmail.com wrote:
What do you mean?  Yes,I an see there  is some data put in the
memory
from
the web ui.
   
On Wed, Feb 11, 2015 at 4:25 PM, Sean Owen so...@cloudera.com
wrote:
   
Are you actually using that memory for executors?
   
On Wed, Feb 11, 2015 at 8:17 AM, lihu lihu...@gmail.com wrote:
 Hi,
 I  run the kmeans(MLlib) in a cluster with 12 workers.
 Every
 work
 own a
 128G RAM, 24Core. I run 48 task in one machine. the total data
 is
 just
 40GB.

When the dimension of the data set is about 10^7, for every
 task
 the
 duration is about 30s, but the cost for GC is about 20s.

When I reduce the dimension to 10^4, then the gc is small.

 So why gc is so high when the dimension is larger? or this
 is
 the
 reason
 caused by MLlib?




   
   
   
   
--
Best Wishes!
   
Li Hu(李浒) | Graduate Student
Institute for Interdisciplinary Information Sciences(IIIS)
Tsinghua University, China
   
Email: lihu...@gmail.com
Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
   
   
  
  
  
  
   --
   Best Wishes!
  
   Li Hu(李浒) | Graduate Student
   Institute for Interdisciplinary Information Sciences(IIIS)
   Tsinghua University, China
  
   Email: lihu...@gmail.com
   Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
  
  
 
 
 
 
  --
  Best Wishes!
 
  Li Hu(李浒) | Graduate Student
  Institute for Interdisciplinary Information Sciences(IIIS)
  Tsinghua University, China
 
  Email: lihu...@gmail.com
  Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 
 





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: loads of memory still GC overhead limit exceeded

2015-02-20 Thread Ilya Ganelin
No problem, Antony. ML lib is tricky! I'd love to chat with you about your
use case - sounds like we're working on similar problems/scales.
On Fri, Feb 20, 2015 at 1:55 PM Xiangrui Meng men...@gmail.com wrote:

 Hi Antony, Is it easy for you to try Spark 1.3.0 or master? The ALS
 performance should be improved in 1.3.0. -Xiangrui

 On Fri, Feb 20, 2015 at 1:32 PM, Antony Mayi
 antonym...@yahoo.com.invalid wrote:
  Hi Ilya,
 
  thanks for your insight, this was the right clue. I had default
 parallelism
  already set but it was quite low (hundreds) and moreover the number of
  partitions of the input RDD was low as well so the chunks were really too
  big. Increased parallelism and repartitioning seems to be helping...
 
  Thanks!
  Antony.
 
 
  On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com
  wrote:
 
 
 
  Hi Anthony - you are seeing a problem that I ran into. The underlying
 issue
  is your default parallelism setting. What's happening is that within ALS
  certain RDD operations end up changing the number of partitions you have
 of
  your data. For example if you start with an RDD of 300 partitions, unless
  default parallelism is set while the algorithm executes you'll eventually
  get an RDD with something like 20 partitions. Consequently, your giant
 data
  set is now stored across a much smaller number of partitions so each
  partition is huge. Then, when a shuffle requires serialization you run
 out
  of heap space trying to serialize it. The solution should be as simple as
  setting the default parallelism setting.
 
  This is referenced in a JIRA I can't find at the moment.
  On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid
 
  wrote:
 
  now with reverted spark.shuffle.io.preferDirectBufs (to true) getting
 again
  GC overhead limit exceeded:
 
  === spark stdout ===
  15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage
 18.0
  (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit
  exceeded
  at
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
  at
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
  at
  java.io.ObjectInputStream.readOrdinaryObject(
 ObjectInputStream.java:1801)
  at
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.
 java:371)
  at
  org.apache.spark.serializer.JavaDeserializationStream.
 readObject(JavaSerializer.scala:62)
 
  === yarn log (same) ===
  15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage
  18.0 (TID 5329)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
  at
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
  at
  java.io.ObjectInputStream.readOrdinaryObject(
 ObjectInputStream.java:1801)
  at
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.
 java:371)
  at
  org.apache.spark.serializer.JavaDeserializationStream.
 readObject(JavaSerializer.scala:62)
 
  === yarn nodemanager ===
  2015-02-19 12:08:13,758 INFO
  org.apache.hadoop.yarn.server.nodemanager.containermanager.
 monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 19014 for container-id
  container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory
  used; 31.7 GB of 67.2 GB virtual memory used
  2015-02-19 12:08:13,778 INFO
  org.apache.hadoop.yarn.server.nodemanager.containermanager.
 monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 19013 for container-id
  container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory
  used; 103.6 MB of 67.2 GB virtual memory used
  2015-02-19 12:08:14,455 WARN
  org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
  code from container container_1424204221358_0013_01_08 is : 143
  2015-02-19 12:08:14,455 INFO
  org.apache.hadoop.yarn.server.nodemanager.containermanager.
 container.Container:
  Container container_1424204221358_0013_01_08 transitioned from
 RUNNING
  to EXITED_WITH_FAILURE
  2015-02-19 12:08:14,455 INFO
  org.apache.hadoop.yarn.server.nodemanager.containermanager.
 launcher.ContainerLaunch:
  Cleaning up container container_1424204221358_0013_01_08
 
  Antony.
 
 
 
 
  On Thursday, 19 February 2015, 11:54, Antony Mayi
  antonym...@yahoo.com.INVALID wrote:
 
 
 
  it is from within the ALS.trainImplicit() call. btw. the exception varies
  between this GC overhead limit exceeded and Java heap space (which I
  guess is just different outcome of same problem).
 
  just tried another run and here are the logs (filtered) - note I tried
 this
  run with spark.shuffle.io.preferDirectBufs=false so this might be
 slightly
  different issue from my previous case (going to 

Re: using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread chirag lakhani
That worked perfectly...thanks so much!

On Fri, Feb 20, 2015 at 3:49 PM, Sourigna Phetsarath 
gna.phetsar...@teamaol.com wrote:

 Correction,

 should be  HADOOP_CONF_DIR=/etc/hive/conf spark-shell --driver-class-path
 '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
 --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
 parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

 On Fri, Feb 20, 2015 at 3:48 PM, Sourigna Phetsarath 
 gna.phetsar...@teamaol.com wrote:

 Correction,

 should be  HADOOP_CONF_DIR=/etc/hive/conf --driver-class-path
 '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
 --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
 parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

 On Fri, Feb 20, 2015 at 3:43 PM, Sourigna Phetsarath 
 gna.phetsar...@teamaol.com wrote:

 Also, you might want to add the hadoop configs:

 HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf --driver-class-path
 '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
 --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
 parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

 Ok. It needs the CDH configs for hive and hadoop.  Hopefully this works
 for you.



 On Fri, Feb 20, 2015 at 3:41 PM, chirag lakhani 
 chirag.lakh...@gmail.com wrote:

 Thanks!  I am able to login to Spark now but I am still getting the
 same error

 scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT
 *).collect().foreach(println)
 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM
 analytics.trainingdatafinal SELECT *
 15/02/20 14:40:22 INFO ParseDriver: Parse Completed
 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with
 implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called
 15/02/20 14:40:23 WARN General: Plugin (Bundle)
 org.datanucleus.api.jdo is already registered. Ensure you dont have
 multiple JAR versions of the same plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar.
 15/02/20 14:40:23 WARN General: Plugin (Bundle)
 org.datanucleus.store.rdbms is already registered. Ensure you dont have
 multiple JAR versions of the same plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar.
 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is
 already registered. Ensure you dont have multiple JAR versions of the same
 plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar.
 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2
 unknown - will be ignored
 15/02/20 14:40:23 INFO Persistence: Property
 hive.metastore.integral.jdo.pushdown unknown - will be ignored
 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin
 classes with
 hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming
 we are not on mysql: Lexical error at line 1, column 5.  Encountered: @
 (64), after : .
 15/02/20 14:40:27 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:27 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Query: Reading in results for query
 org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection
 used is closing
 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore
 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore
 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore
 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role,
 since config is empty
 15/02/20 14:40:29 INFO SessionState: No Tez session 

Re: Spark Performance on Yarn

2015-02-20 Thread Kelvin Chu
Hi Sandy,

I appreciate your clear explanation. Let me try again. It's the best way to
confirm I understand.

spark.executor.memory + spark.yarn.executor.memoryOverhead = the memory
that YARN will create a JVM

spark.executor.memory = the memory I can actually use in my jvm application
= part of it (spark.storage.memoryFraction) is reserved for caching + part
of it (spark.shuffle.memoryFraction) is reserved for shuffling + the
remaining is for bookkeeping  UDFs

If I am correct above, then one implication from them is:

(spark.executor.memory + spark.yarn.executor.memoryOverhead) * number of
executors per machine should be configured smaller than a single machine
physical memory

Right? Again, thanks!

Kelvin

On Fri, Feb 20, 2015 at 11:50 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Hi Kelvin,

 spark.executor.memory controls the size of the executor heaps.

 spark.yarn.executor.memoryOverhead is the amount of memory to request from
 YARN beyond the heap size.  This accounts for the fact that JVMs use some
 non-heap memory.

 The Spark heap is divided into spark.storage.memoryFraction (default 0.6)
 and spark.shuffle.memoryFraction (default 0.2), and the rest is for basic
 Spark bookkeeping and anything the user does inside UDFs.

 -Sandy



 On Fri, Feb 20, 2015 at 11:44 AM, Kelvin Chu 2dot7kel...@gmail.com
 wrote:

 Hi Sandy,

 I am also doing memory tuning on YARN. Just want to confirm, is it
 correct to say:

 spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory
 I can actually use in my jvm application

 If it is not, what is the correct relationship? Any other variables or
 config parameters in play? Thanks.

 Kelvin

 On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 If that's the error you're hitting, the fix is to boost
 spark.yarn.executor.memoryOverhead, which will put some extra room in
 between the executor heap sizes and the amount of memory requested for them
 from YARN.

 -Sandy

 On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the
 executor

 Given my cluster specs above what would be appropriate parameters to
 pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container
 [pid=23320,containerID=container_1423083596644_0238_01_004160] is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread Sourigna Phetsarath
Try it without

--master yarn-cluster

if you are trying to run a spark-shell. :)

On Fri, Feb 20, 2015 at 3:18 PM, chirag lakhani chirag.lakh...@gmail.com
wrote:

 I tried

 spark-shell --master yarn-cluster --driver-class-path
 '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
 --driver-java-options
 '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

 and I get the following error

 Error: Cluster deploy mode is not applicable to Spark shells.
 Run with --help for usage help or --verbose for debug output



 On Fri, Feb 20, 2015 at 2:52 PM, Sourigna Phetsarath 
 gna.phetsar...@teamaol.com wrote:

 Chirag,

 This worked for us:

 spark-submit --master yarn-cluster --driver-class-path
 '/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options
 '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*'
 ...

 Let me know, if you have any issues.

 On Fri, Feb 20, 2015 at 2:43 PM, chirag lakhani chirag.lakh...@gmail.com
  wrote:

 I am trying to access a hive table using spark sql but I am having
 trouble.  I followed the instructions in a cloudera community board which
 stated

 1) Import hive jars into the class path

 export SPARK_CLASSPATH=$(find
 /data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/ -name
 '*.jar' -print0 | sed 's/\x0/:/g')

 2) start the spark shell

 spark-shell

 3) created a hive context

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 4) then run query

 sqlContext.sql(FROM analytics.trainingdatafinal SELECT
 *).collect().foreach(println)


 When I do this it seems that it cannot find the table in the hive
 metastore, I have put all of my cloudera parcels in the partition starting
 with /data as opposed to the default location used by cloudera.  Any
 suggestions on what can be done?  I am putting the error below


 15/02/20 13:43:01 ERROR Hive:
 NoSuchObjectException(message:analytics.trainingdatafinal table not found)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106)
 at com.sun.proxy.$Proxy24.get_table(Unknown Source)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
 at com.sun.proxy.$Proxy25.getTable(Unknown Source)
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000)
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:974)
 at
 org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
 at org.apache.spark.sql.hive.HiveContext$$anon$2.org
 $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at 

Re: Spark 1.3 SQL Programming Guide and sql._ / sql.types._

2015-02-20 Thread Denny Lee
Oh no worries at all. If you want, I'd be glad to make updates and PR for
anything I find, eh?!
On Fri, Feb 20, 2015 at 12:18 Michael Armbrust mich...@databricks.com
wrote:

 Yeah, sorry.  The programming guide has not been updated for 1.3.  I'm
 hoping to get to that this weekend / next week.

 On Fri, Feb 20, 2015 at 9:55 AM, Denny Lee denny.g@gmail.com wrote:

 Quickly reviewing the latest SQL Programming Guide
 https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md
 (in github) I had a couple of quick questions:

 1) Do we need to instantiate the SparkContext as per
 // sc is an existing SparkContext.
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 Within Spark 1.3 the sqlContext is already available so probably do not
 need to make this call.

 2) Importing org.apache.spark.sql._ should bring in both SQL data types,
 struct types, and row
 // Import Spark SQL data types and Row.
 import org.apache.spark.sql._

 Currently with Spark 1.3 RC1, it appears org.apache.spark.sql._ only
 brings in row.

 scala import org.apache.spark.sql._

 import org.apache.spark.sql._


 scala val schema =

  |   StructType(

  | schemaString.split( ).map(fieldName =
 StructField(fieldName, StringType, true)))

 console:25: error: not found: value StructType

  StructType(

 But if I also import in org.apache.spark.sql.types_

 scala import org.apache.spark.sql.types._

 import org.apache.spark.sql.types._


 scala val schema =

  |   StructType(

  | schemaString.split( ).map(fieldName =
 StructField(fieldName, StringType, true)))

 schema: org.apache.spark.sql.types.StructType =
 StructType(StructField(DeviceMake,StringType,true),
 StructField(Country,StringType,true))

 Wondering if this is by design or perhaps a quick documentation / package
 update is warranted.








Spark performance tuning

2015-02-20 Thread java8964
Hi, 
I am new to Spark, and I am trying to test the Spark SQL performance vs Hive. I 
setup a standalone box, with 24 cores and 64G memory.
We have one SQL in mind to test. Here is the basically setup on this one box 
for the SQL we are trying to run:
1) Dataset 1, 6.6G AVRO file with snappy compression, which contains nest 
structure of 3 array of struct in AVRO2) Dataset2, 5G AVRO file with snappy 
compression3) Dataset3, 2.3M AVRO file with snappy compression.
The basic structure of the query is like this:

(selectxxxfromdataset1 lateral view outer explode(struct1) lateral view outer 
explode(struct2)where x )left outer join(select  from dataset2 lateral 
view explode(xxx) where )on left outer join(select xxx from dataset3 
where )on x
So overall what it does is 2 outer explode on dataset1, left outer join with 
explode of dataset2, then finally left outer join with dataset 3.
On this standalone box, I installed Hadoop 2.2 and Hive 0.12, and Spark 1.2.0.
Baseline, the above query can finish around 50 minutes in Hive 12, with 6 
mappers and 3 reducers, each with 1G max heap, in 3 rounds of MR jobs.
This is a very expensive query running in our production, of course with much 
bigger data set, every day. Now I want to see how fast Spark can do for the 
same query.
I am using the following settings, based on my understanding of Spark, for a 
fair test between it and Hive:
export SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2g--executor-memory 9g 
--total-executor-cores 9
I am trying to run the one executor with 9 cores and max 9G heap, to make Spark 
use almost same resource we gave to the MapReduce. Here is the result without 
any additional configuration changes, running under Spark 1.2.0, using 
HiveContext in Spark SQL, to run the exactly same query:
The Spark SQL generated 5 stage of tasks, shown below:4   collect at 
SparkPlan.scala:84 +details  2015/02/20 10:48:46 26 s200/200
 3   mapPartitions at Exchange.scala:64 +details 2015/02/20 10:32:07 16 min  
200/200 1112.3 MB2   mapPartitions at Exchange.scala:64 
+details 2015/02/20 10:22:06 9 min  40/40   4.7 GB  22.2 GB1   
mapPartitions at Exchange.scala:64 +details 2015/02/20 10:22:06 1.9 min 50/50   
6.2 GB  2.8 GB0   mapPartitions at Exchange.scala:64 +details 
2015/02/20 10:22:06 6 s 2/2 2.3 MB  156.6 KB
So the wall time of whole query is 26s + 16m + 9m + 2m + 6s, around 28 minutes.
It is about 56% of originally time, not bad. But I want to know any tuning of 
Spark can make it even faster.
For stage 2 and 3, I observed that GC time is more and more expensive. 
Especially in stage 3, shown below:
For stage 3:Metric  Min 25th percentile Median  75th percentile 
MaxDuration20 s30 s35 s39 s
2.4 minGC Time 9 s 17 s20 s25 s
2.2 minShuffle Write   4.7 MB  4.9 MB  5.2 MB  6.1 MB  
8.3 MB
So in median, the GC time took overall 20s/35s = 57% of time.
First change I made is to add the following line in the 
spark-default.conf:spark.serializer org.apache.spark.serializer.KryoSerializer
My assumption is that using kryoSerializer, instead of default java serialize, 
will lower the memory footprint, should lower the GC pressure during runtime. I 
know the I changed the correct spark-default.conf, because if I were add 
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps in the same file, I will see the GC usage in the stdout 
file. Of course, in this test, I didn't add that, as I want to only make one 
change a time.The result is almost the same, as using standard java serialize. 
The wall time is still 28 minutes, and in stage 3, the GC still took around 50 
to 60% of time, almost same result within min, median to max in stage 3, 
without any noticeable performance gain.
Next, based on my understanding, and for this test, I think the default 
spark.storage.memoryFraction is too high for this query, as there is no reason 
to reserve so much memory for caching data, Because we don't reuse any dataset 
in this one query. So I add this at the end of spark-shell command --conf 
spark.storage.memoryFraction=0.3, as I want to just reserve half of the memory 
for caching data vs first time. Of course, this time, I rollback the first 
change of KryoSerializer.
The result looks like almost the same. The whole query finished around 28s + 
14m + 9.6m + 1.9m + 6s = 27 minutes.
It looks like that Spark is faster than Hive, but is there any steps I can make 
it even faster? Why using KryoSerializer makes no difference? If I want to 
use the same resource as now, anything I can do to speed it up more, especially 
lower the GC time?
Thanks
Yong
  

Re: using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread chirag lakhani
Thanks!  I am able to login to Spark now but I am still getting the same
error

scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT
*).collect().foreach(println)
15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM
analytics.trainingdatafinal SELECT *
15/02/20 14:40:22 INFO ParseDriver: Parse Completed
15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called
15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.api.jdo
is already registered. Ensure you dont have multiple JAR versions of the
same plugin in the classpath. The URL
file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar
is already registered, and you are trying to register an identical plugin
located at URL
file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar.
15/02/20 14:40:23 WARN General: Plugin (Bundle)
org.datanucleus.store.rdbms is already registered. Ensure you dont have
multiple JAR versions of the same plugin in the classpath. The URL
file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar
is already registered, and you are trying to register an identical plugin
located at URL
file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar.
15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is
already registered. Ensure you dont have multiple JAR versions of the same
plugin in the classpath. The URL
file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar
is already registered, and you are trying to register an identical plugin
located at URL
file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar.
15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2
unknown - will be ignored
15/02/20 14:40:23 INFO Persistence: Property
hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes
with
hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming we
are not on mysql: Lexical error at line 1, column 5.  Encountered: @
(64), after : .
15/02/20 14:40:27 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
embedded-only so does not have its own datastore table.
15/02/20 14:40:27 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MOrder is tagged as
embedded-only so does not have its own datastore table.
15/02/20 14:40:28 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
embedded-only so does not have its own datastore table.
15/02/20 14:40:28 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MOrder is tagged as
embedded-only so does not have its own datastore table.
15/02/20 14:40:28 INFO Query: Reading in results for query
org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is
closing
15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore
15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore
15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore
15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role, since
config is empty
15/02/20 14:40:29 INFO SessionState: No Tez session required at this point.
hive.execution.engine=mr.
15/02/20 14:40:29 INFO HiveMetaStore: 0: get_table : db=analytics
tbl=trainingdatafinal
15/02/20 14:40:29 INFO audit: ugi=hdfs ip=unknown-ip-addr cmd=get_table :
db=analytics tbl=trainingdatafinal
15/02/20 14:40:29 ERROR Hive:
NoSuchObjectException(message:analytics.trainingdatafinal table not found)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106)
at com.sun.proxy.$Proxy24.get_table(Unknown Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
at com.sun.proxy.$Proxy25.getTable(Unknown Source)
at 

Re: using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread Sourigna Phetsarath
Also, you might want to add the hadoop configs:

HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf --driver-class-path
'/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
--driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/
parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

Ok. It needs the CDH configs for hive and hadoop.  Hopefully this works for
you.



On Fri, Feb 20, 2015 at 3:41 PM, chirag lakhani chirag.lakh...@gmail.com
wrote:

 Thanks!  I am able to login to Spark now but I am still getting the same
 error

 scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT
 *).collect().foreach(println)
 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM
 analytics.trainingdatafinal SELECT *
 15/02/20 14:40:22 INFO ParseDriver: Parse Completed
 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with
 implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called
 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.api.jdo
 is already registered. Ensure you dont have multiple JAR versions of the
 same plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar.
 15/02/20 14:40:23 WARN General: Plugin (Bundle)
 org.datanucleus.store.rdbms is already registered. Ensure you dont have
 multiple JAR versions of the same plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar.
 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is
 already registered. Ensure you dont have multiple JAR versions of the same
 plugin in the classpath. The URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar
 is already registered, and you are trying to register an identical plugin
 located at URL
 file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar.
 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2
 unknown - will be ignored
 15/02/20 14:40:23 INFO Persistence: Property
 hive.metastore.integral.jdo.pushdown unknown - will be ignored
 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes
 with
 hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming we
 are not on mysql: Lexical error at line 1, column 5.  Encountered: @
 (64), after : .
 15/02/20 14:40:27 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:27 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 15/02/20 14:40:28 INFO Query: Reading in results for query
 org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used
 is closing
 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore
 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore
 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore
 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role,
 since config is empty
 15/02/20 14:40:29 INFO SessionState: No Tez session required at this
 point. hive.execution.engine=mr.
 15/02/20 14:40:29 INFO HiveMetaStore: 0: get_table : db=analytics
 tbl=trainingdatafinal
 15/02/20 14:40:29 INFO audit: ugi=hdfs ip=unknown-ip-addr cmd=get_table :
 db=analytics tbl=trainingdatafinal
 15/02/20 14:40:29 ERROR Hive:
 NoSuchObjectException(message:analytics.trainingdatafinal table not found)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106)
 at 

Re: Spark Performance on Yarn

2015-02-20 Thread Sandy Ryza
That's all correct.

-Sandy

On Fri, Feb 20, 2015 at 1:23 PM, Kelvin Chu 2dot7kel...@gmail.com wrote:

 Hi Sandy,

 I appreciate your clear explanation. Let me try again. It's the best way
 to confirm I understand.

 spark.executor.memory + spark.yarn.executor.memoryOverhead = the memory
 that YARN will create a JVM

 spark.executor.memory = the memory I can actually use in my jvm
 application = part of it (spark.storage.memoryFraction) is reserved for
 caching + part of it (spark.shuffle.memoryFraction) is reserved for
 shuffling + the remaining is for bookkeeping  UDFs

 If I am correct above, then one implication from them is:

 (spark.executor.memory + spark.yarn.executor.memoryOverhead) * number of
 executors per machine should be configured smaller than a single machine
 physical memory

 Right? Again, thanks!

 Kelvin

 On Fri, Feb 20, 2015 at 11:50 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Kelvin,

 spark.executor.memory controls the size of the executor heaps.

 spark.yarn.executor.memoryOverhead is the amount of memory to request
 from YARN beyond the heap size.  This accounts for the fact that JVMs use
 some non-heap memory.

 The Spark heap is divided into spark.storage.memoryFraction (default 0.6)
 and spark.shuffle.memoryFraction (default 0.2), and the rest is for basic
 Spark bookkeeping and anything the user does inside UDFs.

 -Sandy



 On Fri, Feb 20, 2015 at 11:44 AM, Kelvin Chu 2dot7kel...@gmail.com
 wrote:

 Hi Sandy,

 I am also doing memory tuning on YARN. Just want to confirm, is it
 correct to say:

 spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory
 I can actually use in my jvm application

 If it is not, what is the correct relationship? Any other variables or
 config parameters in play? Thanks.

 Kelvin

 On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 If that's the error you're hitting, the fix is to boost
 spark.yarn.executor.memoryOverhead, which will put some extra room in
 between the executor heap sizes and the amount of memory requested for them
 from YARN.

 -Sandy

 On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the
 executor

 Given my cluster specs above what would be appropriate parameters to
 pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container
 [pid=23320,containerID=container_1423083596644_0238_01_004160] is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash
 -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976
 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: loads of memory still GC overhead limit exceeded

2015-02-20 Thread Antony Mayi
Hi Ilya,
thanks for your insight, this was the right clue. I had default parallelism 
already set but it was quite low (hundreds) and moreover the number of 
partitions of the input RDD was low as well so the chunks were really too big. 
Increased parallelism and repartitioning seems to be helping...
Thanks!Antony. 

 On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com 
wrote:
   
 

 Hi Anthony - you are seeing a problem that I ran into. The underlying issue is 
your default parallelism setting. What's happening is that within ALS certain 
RDD operations end up changing the number of partitions you have of your data. 
For example if you start with an RDD of 300 partitions, unless default 
parallelism is set while the algorithm executes you'll eventually get an RDD 
with something like 20 partitions. Consequently, your giant data set is now 
stored across a much smaller number of partitions so each partition is huge. 
Then, when a shuffle requires serialization you run out of heap space trying to 
serialize it. The solution should be as simple as setting the default 
parallelism setting. 

This is referenced in a JIRA I can't find at the moment. 
On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid 
wrote:

now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC 
overhead limit exceeded:
=== spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 
7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC 
overhead limit exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in 
task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit 
exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn nodemanager ===2015-02-19 12:08:13,758 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19014 for container-id 
container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 
31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19013 for container-id 
container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 
103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1424204221358_0013_01_08 is : 1432015-02-19 
12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1424204221358_0013_01_08 transitioned from RUNNING to 
EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0013_01_08
Antony.

 

 On Thursday, 19 February 2015, 11:54, Antony Mayi 
antonym...@yahoo.com.INVALID wrote:
   
 

 it is from within the ALS.trainImplicit() call. btw. the exception varies 
between this GC overhead limit exceeded and Java heap space (which I guess 
is just different outcome of same problem).
just tried another run and here are the logs (filtered) - note I tried this run 
with spark.shuffle.io.preferDirectBufs=false so this might be slightly 
different issue from my previous case (going to revert now):
=== spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: 
Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent 
heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN 
storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 
192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 
45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost 
executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 
WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 

Re: No executors allocated on yarn with latest master branch

2015-02-20 Thread Sandy Ryza
Are you using the capacity scheduler or fifo scheduler without multi
resource scheduling by any chance?

On Thu, Feb 12, 2015 at 1:51 PM, Anders Arpteg arp...@spotify.com wrote:

 The nm logs only seems to contain similar to the following. Nothing else
 in the same time range. Any help?

 2015-02-12 20:47:31,245 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_02
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_12
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_22
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_32
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_42
 2015-02-12 21:24:30,515 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: FINISH_APPLICATION sent to absent application
 application_1422406067005_0053

 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 It seems unlikely to me that it would be a 2.2 issue, though not entirely
 impossible.  Are you able to find any of the container logs?  Is the
 NodeManager launching containers and reporting some exit code?

 -Sandy

 On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com
 wrote:

 No, not submitting from windows, from a debian distribution. Had a quick
 look at the rm logs, and it seems some containers are allocated but then
 released again for some reason. Not easy to make sense of the logs, but
 here is a snippet from the logs (from a test in our small test cluster) if
 you'd like to have a closer look: http://pastebin.com/8WU9ivqC

 Sandy, sounds like it could possible be a 2.2 issue then, or what do you
 think?

 Thanks,
 Anders

 On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of
 YARN to see if you can trace the error. In the past I have to closely look
 at arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at
 com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to 

Re: what does Submitting ... missing tasks from Stage mean?

2015-02-20 Thread Imran Rashid
yeah, this is just the totally normal message when spark executes
something.  The first time something is run, all of its tasks are
missing.  I would not worry about cases when all tasks aren't missing
if you're new to spark, its probably an advanced concept that you don't
care about.  (and would take me some time to explain :)

On Fri, Feb 20, 2015 at 8:20 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 Probably this is silly question, but I couldn't find any clear
 documentation explaining why  one  should submitting... missing tasks from
 Stage ... in the logs?

 Specially in my case when I do not have any failure in job execution, I
 wonder why this should happen?
 Does it have any relation to lazy evaluation?

 best,
 /Shahab



Re: Which OutputCommitter to use for S3?

2015-02-20 Thread Mingyu Kim
I didn’t get any response. It’d be really appreciated if anyone using a special 
OutputCommitter for S3 can comment on this!

Thanks,
Mingyu

From: Mingyu Kim m...@palantir.commailto:m...@palantir.com
Date: Monday, February 16, 2015 at 1:15 AM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Which OutputCommitter to use for S3?

HI all,

The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to 
require moving files at the commit step, which is not a constant operation in 
S3, as discussed in 
http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3Ehttps://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=.
 People seem to develop their own NullOutputCommitter implementation or use 
DirectFileOutputCommitter (as mentioned in 
SPARK-3595https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=),
 but I wanted to check if there is a de facto standard, publicly available 
OutputCommitter to use for S3 in conjunction with Spark.

Thanks,
Mingyu


Re: Spark Performance on Yarn

2015-02-20 Thread Lee Bierman
Thanks for the suggestions.
I'm experimenting with different values for spark memoryOverhead and
explictly giving the executors more memory, but still have not found the
golden medium to get it to finish in a proper time frame.

Is my cluster massively undersized at 5 boxes, 8gb 2cpu ?
Trying to figure out a memory setting and executor setting so it runs on
many containers in parallel.

I'm still struggling as pig jobs and hive jobs on the same whole data set
don't take as long. I'm wondering too if the logic in our code is just
doing something silly causing multiple reads of all the data.


On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 If that's the error you're hitting, the fix is to boost
 spark.yarn.executor.memoryOverhead, which will put some extra room in
 between the executor heap sizes and the amount of memory requested for them
 from YARN.

 -Sandy

 On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the executor

 Given my cluster specs above what would be appropriate parameters to pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=23320,containerID=container_1423083596644_0238_01_004160]
 is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Which OutputCommitter to use for S3?

2015-02-20 Thread Josh Rosen
We (Databricks) use our own DirectOutputCommitter implementation, which is
a couple tens of lines of Scala code.  The class would almost entirely be a
no-op except we took some care to properly handle the _SUCCESS file.

On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote:

  I didn’t get any response. It’d be really appreciated if anyone using a
 special OutputCommitter for S3 can comment on this!

  Thanks,
 Mingyu

   From: Mingyu Kim m...@palantir.com
 Date: Monday, February 16, 2015 at 1:15 AM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Which OutputCommitter to use for S3?

   HI all,

  The default OutputCommitter used by RDD, which is FileOutputCommitter,
 seems to require moving files at the commit step, which is not a constant
 operation in S3, as discussed in
 http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E
 https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=.
 People seem to develop their own NullOutputCommitter implementation or use
 DirectFileOutputCommitter (as mentioned in SPARK-3595
 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=),
 but I wanted to check if there is a de facto standard, publicly available
 OutputCommitter to use for S3 in conjunction with Spark.

  Thanks,
 Mingyu



Re: randomSplit instead of a huge map reduce ?

2015-02-20 Thread Ashish Rangole
Is there a check you can put in place to not create pairs that aren't in
your set of 20M pairs? Additionally, once you have your arrays converted to
pairs you can do aggregateByKey with each pair being the key.
On Feb 20, 2015 1:57 PM, shlomib shl...@summerhq.com wrote:

 Hi,

 I am new to Spark and I think I missed something very basic.

 I have the following use case (I use Java and run Spark locally on my
 laptop):


 I have a JavaRDDString[]

 - The RDD contains around 72,000 arrays of strings (String[])

 - Each array contains 80 words (on average).


 What I want to do is to convert each array into a new array/list of pairs,
 for example:

 Input: String[] words = ['a', 'b', 'c']

 Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')]

 and then I want to count the number of times each pair appeared, so my
 final
 output should be something like:

 Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c',
 8), (b', 'c', 10)]


 The problem:

 Since each array contains around 80 words, it returns around 3,200 pairs,
 so
 after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs to
 reduce which require way too much memory.

 (I know I have only around *20,000,000* unique pairs!)

 I already modified my code and used 'mapPartitions' instead of 'map'. It
 definitely improved the performance, but I still feel I'm doing something
 completely wrong.


 I was wondering if this is the right 'Spark way' to solve this kind of
 problem, or maybe I should do something like splitting my original RDD into
 smaller parts (by using randomSplit), then iterate over each part,
 aggregate
 the results into some result RDD (by using 'union') and move on to the next
 part.


 Can anyone please explain me which solution is better?


 Thank you very much,

 Shlomi.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




About FlumeUtils.createStream

2015-02-20 Thread bit1...@163.com

Hi,
In the spark streaming application, I write the code, 
FlumeUtils.createStream(ssc,localhost,),which means spark will listen on 
the  port, and wait for Flume Sink to write to it.
My question is:  when I submit the application to the Spark Standalone cluster, 
will  be opened only on the Driver Machine or all the workers will also 
open the  port and wait for the Flume data? 






Force RDD evaluation

2015-02-20 Thread pnpritchard
Is there a technique for forcing the evaluation of an RDD?

I have used actions to do so but even the most basic count has a
non-negligible cost (even on a cached RDD, repeated calls to count take
time).

My use case is for logging the execution time of the major components in my
application. At the end of each component I have a statement like
rdd.cache().count() and time how long it takes.

Thanks in advance for any advice!
Nick



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Force-RDD-evaluation-tp21748.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Shuffle Spill

2015-02-20 Thread Thomas Gerber
Hello,

I have a few tasks in a stage with lots of tasks that have a large amount
of shuffle spill.

I scouted the web to understand shuffle spill, and I did not find any
simple explanation of the spill mechanism. What I put together is:

1. the shuffle spill can happens when the shuffle is written on disk (i.e.
by the last map stage, as opposed to when the shuffle is read by the
reduce stage)
2. the reason it happens is when it has a lot to write in the shuffle, and
since that shuffle needs to be sorted by key, the spilling mechanism allows
Spark to do that

I am unclear however if a large task will systematically lead to shuffle
spill, or if the number of keys (for the next reduce stage) that particular
task encounters has also an impact.

Concretely:
Let's say I have:
val ab = RDD[(a,b)]
val ac = RDD[(a,c)]
val bd = RDD[(b,d)]

and I do:
val bc = ab.join(ac).values // we investigate this task, triggered by values
val cd = bc.join(bd).values

The task we investigate reads from a previous shuffle, and will write to
another shuffle to prepare for the second join. I know that I have data
skew on a key on a, meaning a few tasks are expected to be large and I
have stragglers.

Now, is that the cause of the shuffle spill, or is it because those
straggler tasks also happen to have in their midst a very large amount of
distinct bs?

Thanks


RE: using a database connection pool to write data into an RDBMS from a Spark application

2015-02-20 Thread Mohammed Guller
Sean,
I know that Class.forName is not required since Java 1.4 :-) It was just a 
desperate attempt  to make sure that the Postgres driver is getting loaded. 
Since Class.forName(org.postgresql.Driver) is not throwing an exception, I 
assume that the driver is available in the classpath. Is that not true?

I did some more troubleshooting and here is what I found:
1) The hive libraries used by Spark use BoneCP 0.7.1
2) When Spark master is started, it initializes BoneCP, which will not load any 
database driver at that point (that makes sense)
3) When my application initializes BoneCP, it thinks it is already initialized 
and does not load the Postgres driver ( this is a known bug in 0.7.1). This bug 
is fixed in BoneCP 0.8.0 release.

So I linked my app with BoneCP 0.8.0 release, but when I run my app using 
spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that 
behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load BoneCP 
0.8.0? I tried the --jars and --driver-classpath flags, but it didn't help. 

Thanks,
Mohammed


-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Friday, February 20, 2015 2:06 AM
To: Mohammed Guller
Cc: Kelvin Chu; user@spark.apache.org
Subject: Re: using a database connection pool to write data into an RDBMS from 
a Spark application

Although I don't know if it's related, the Class.forName() method of loading 
drivers is very old. You should be using DataSource and javax.sql; this has 
been the usual practice since about Java 1.4.

Why do you say a different driver is being loaded? that's not the error here.

Try instantiating the driver directly to test whether it's available in the 
classpath. Otherwise you would have to check whether the jar exists, the class 
exists in it, and it's really on your classpath.

On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com wrote:
 Hi Kelvin,



 Yes. I am creating an uber jar with the Postgres driver included, but 
 nevertheless tried both –jars and –driver-classpath flags. It didn’t help.



 Interestingly, I can’t use BoneCP even in the driver program when I 
 run my application with spark-submit. I am getting the same exception 
 when the application initializes BoneCP before creating SparkContext. 
 It looks like Spark is loading a different version of the Postgres 
 JDBC driver than the one that I am linking.



 Mohammed



 From: Kelvin Chu [mailto:2dot7kel...@gmail.com]
 Sent: Thursday, February 19, 2015 7:56 PM
 To: Mohammed Guller
 Cc: user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an 
 RDBMS from a Spark application



 Hi Mohammed,



 Did you use --jars to specify your jdbc driver when you submitted your job?
 Take a look of this link:
 http://spark.apache.org/docs/1.2.0/submitting-applications.html



 Hope this help!



 Kelvin



 On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller 
 moham...@glassbeam.com
 wrote:

 Hi –

 I am trying to use BoneCP (a database connection pooling library) to 
 write data from my Spark application to an RDBMS. The database inserts 
 are inside a foreachPartition code block. I am getting this exception 
 when the code tries to insert data using BoneCP:



 java.sql.SQLException: No suitable driver found for 
 jdbc:postgresql://hostname:5432/dbname



 I tried explicitly loading the Postgres driver on the worker nodes by 
 adding the following line inside the foreachPartition code block:



 Class.forName(org.postgresql.Driver)



 It didn’t help.



 Has anybody able to get a database connection pool library to work 
 with Spark? If you got it working, can you please share the steps?



 Thanks,

 Mohammed






Re: Spark Streaming and message ordering

2015-02-20 Thread Jörn Franke
You may think as well if your use case really needs a very strict order,
because configuring spark that it supports such a strict order means
rendering most of benefits useless (failure handling,  parallelism etc.).
Usually, in a distributed setting you can order events, but this also means
that you may need to wait for an unlimited time to be sure that you receive
all events to order them. This is impractical, so people implements time
outs, which may lead to the case that you loose events etc.
The optimal thing would be to partition the data and that there needs to be
an order within the partition (across is a different story...).
All in all implementing order in Spark depends on your requirements for
ordering and depending on this it can  be easy or very difficult. You may
also consider writing your own framework for mesos or yarn to better meet
the requirements and keep your spark cluster config clean (what happens if
there are spark jobs not requiring an order? They would be slowed down)
So you need to think about: by which criteria can I order events, do I
accept loss of events?, do I need a global order over all events or is it
only relevant for subsets (partions), what is the impact of not ordering?,
what is the impact of loss of events,...
Le 20 févr. 2015 18:01, Cody Koeninger c...@koeninger.org a écrit :

 There is typically some slack between when a batch finishes executing and
 when the next batch is scheduled.  You should be able to arrange your batch
 sizes / cluster resources to ensure that.  If there isn't slack, your
 overall delay is going to keep increasing indefinitely.

 If you're inserting into mysql, you're probably going to be much better
 off doing bulk inserts anyway, and transaction ordering is going to stop a
 lot of overlap that might otherwise happen.  In pseudocode:

 stream.foreachRdd { rdd =
   rdd.foreachPartition { iter =
  bulk = iter.filter(matchEvent).toList
  transaction { insert bulk }
   }
 }

 You may already know this, but getting jdbc to do true bulk inserts to
 mysql requires a bit of hoop jumping, so turn on query logging during
 development to make sure you aren't getting individual inserts.

 Also be aware that output actions aren't guaranteed to happen exactly
 once, so you'll need to store unique offset ids in mysql or otherwise deal
 with the possibility of executor failures.


 On Fri, Feb 20, 2015 at 10:39 AM, Neelesh neele...@gmail.com wrote:

 Thanks for the detailed response Cody. Our use case is to  do some
 external lookups (cached and all) for every event, match the event against
 the looked up data, decide whether to write an entry in mysql and write it
 in the order in which the events arrived within a kafka partition.

 We don't need global ordering. Message ordering within a batch can be
 achieved either by waiting for 1.3 to be released (the behavior you
 described works very well for us, within a batch) , or by using
  updateStateByKey and sorting.   speculative execution is turned off as
 well (I think its off by default).

 But, from what I see from the JobScheduler/JobGenerator is this. Within
 each stream, jobs are generated every 'n' milliseconds (batch duration),
 and submitted for execution. Since job generation in a stream is temporal,
 its guaranteed that the jobs are submitted in the order of event arrival
 within a stream. And since we have one stream per kafka partition, this
 translates to sequentially generated batches  sequentially scheduled
 batches within a kafka partition. But since the *execution* of jobs
 itself is in parallel, its probable that back-to-back batches in a stream
 are submitted one after the other , but are executing concurrently. If this
 understanding of mine is correct, it breaks our requirement that messages
 be executed in order within a partition.

 Thanks!






 On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger c...@koeninger.org
 wrote:

 For a given batch, for a given partition, the messages will be processed
 in order by the executor that is running that partition.  That's because
 messages for the given offset range are pulled by the executor, not pushed
 from some other receiver.

 If you have speculative execution, yes, another executor may be running
 that partition.

 If your job is lagging behind in processing such that the next batch
 starts executing before the last batch is finished processing, yes it is
 possible for some other executor to start working on messages from that
 same kafka partition.

 The obvious solution here seems to be turn off speculative execution and
 adjust your batch interval / sizes such that they can comfortably finish
 processing :)

 If your processing time is sufficiently non-linear with regard to the
 number of messages, yes you might be able to do something with overriding
 dstream.compute.  Unfortunately the new kafka dstream implementation is
 private, so it's not straightforward to subclass it.  I'd like to get a
 solution in place for people 

Re: Spark Streaming and message ordering

2015-02-20 Thread Neelesh
Thanks Jorn. Indeed, we do not need global ordering, since our data is
partitioned well. We do not need ordering based on wallclock time, that
would require waiting indefinitely.  All we need is the execution of
batches (not job submission) to happen in the same order they are
generated, which looks like is not enforced, but more a side effect of how
job submission happens as of now. Cody's suggestions are useful to our
case, though I need to take a closer look how job executions happen within
a stream. Loss of parallelism or failure handling are an issue mainly for
global ordering. Global ordering is a much harder problem and relevant only
for a small set of use cases, in my opinion. Data is almost always
partitioned in some way and any specific ordering behavior is typically
constrained within a partition in general.

So for us - loss of events is unacceptable, events must be executed
in-order within a partition (strictly speaking, 1-1 mapping with kafka
partitions) , and our execution logic is idempotent. All of these seem to
be possible with 1.3, with some minor tweaks

thnx!

On Fri, Feb 20, 2015 at 9:24 AM, Jörn Franke jornfra...@gmail.com wrote:

 You may think as well if your use case really needs a very strict order,
 because configuring spark that it supports such a strict order means
 rendering most of benefits useless (failure handling,  parallelism etc.).
 Usually, in a distributed setting you can order events, but this also means
 that you may need to wait for an unlimited time to be sure that you receive
 all events to order them. This is impractical, so people implements time
 outs, which may lead to the case that you loose events etc.
 The optimal thing would be to partition the data and that there needs to
 be an order within the partition (across is a different story...).
 All in all implementing order in Spark depends on your requirements for
 ordering and depending on this it can  be easy or very difficult. You may
 also consider writing your own framework for mesos or yarn to better meet
 the requirements and keep your spark cluster config clean (what happens if
 there are spark jobs not requiring an order? They would be slowed down)
 So you need to think about: by which criteria can I order events, do I
 accept loss of events?, do I need a global order over all events or is it
 only relevant for subsets (partions), what is the impact of not ordering?,
 what is the impact of loss of events,...
 Le 20 févr. 2015 18:01, Cody Koeninger c...@koeninger.org a écrit :

 There is typically some slack between when a batch finishes executing and
 when the next batch is scheduled.  You should be able to arrange your batch
 sizes / cluster resources to ensure that.  If there isn't slack, your
 overall delay is going to keep increasing indefinitely.

 If you're inserting into mysql, you're probably going to be much better
 off doing bulk inserts anyway, and transaction ordering is going to stop a
 lot of overlap that might otherwise happen.  In pseudocode:

 stream.foreachRdd { rdd =
   rdd.foreachPartition { iter =
  bulk = iter.filter(matchEvent).toList
  transaction { insert bulk }
   }
 }

 You may already know this, but getting jdbc to do true bulk inserts to
 mysql requires a bit of hoop jumping, so turn on query logging during
 development to make sure you aren't getting individual inserts.

 Also be aware that output actions aren't guaranteed to happen exactly
 once, so you'll need to store unique offset ids in mysql or otherwise deal
 with the possibility of executor failures.


 On Fri, Feb 20, 2015 at 10:39 AM, Neelesh neele...@gmail.com wrote:

 Thanks for the detailed response Cody. Our use case is to  do some
 external lookups (cached and all) for every event, match the event against
 the looked up data, decide whether to write an entry in mysql and write it
 in the order in which the events arrived within a kafka partition.

 We don't need global ordering. Message ordering within a batch can be
 achieved either by waiting for 1.3 to be released (the behavior you
 described works very well for us, within a batch) , or by using
  updateStateByKey and sorting.   speculative execution is turned off as
 well (I think its off by default).

 But, from what I see from the JobScheduler/JobGenerator is this. Within
 each stream, jobs are generated every 'n' milliseconds (batch duration),
 and submitted for execution. Since job generation in a stream is temporal,
 its guaranteed that the jobs are submitted in the order of event arrival
 within a stream. And since we have one stream per kafka partition, this
 translates to sequentially generated batches  sequentially scheduled
 batches within a kafka partition. But since the *execution* of jobs
 itself is in parallel, its probable that back-to-back batches in a stream
 are submitted one after the other , but are executing concurrently. If this
 understanding of mine is correct, it breaks our requirement that messages
 be executed 

Re: Spark Streaming and message ordering

2015-02-20 Thread Neelesh
Thanks for the detailed response Cody. Our use case is to  do some external
lookups (cached and all) for every event, match the event against the
looked up data, decide whether to write an entry in mysql and write it in
the order in which the events arrived within a kafka partition.

We don't need global ordering. Message ordering within a batch can be
achieved either by waiting for 1.3 to be released (the behavior you
described works very well for us, within a batch) , or by using
 updateStateByKey and sorting.   speculative execution is turned off as
well (I think its off by default).

But, from what I see from the JobScheduler/JobGenerator is this. Within
each stream, jobs are generated every 'n' milliseconds (batch duration),
and submitted for execution. Since job generation in a stream is temporal,
its guaranteed that the jobs are submitted in the order of event arrival
within a stream. And since we have one stream per kafka partition, this
translates to sequentially generated batches  sequentially scheduled
batches within a kafka partition. But since the *execution* of jobs itself
is in parallel, its probable that back-to-back batches in a stream are
submitted one after the other , but are executing concurrently. If this
understanding of mine is correct, it breaks our requirement that messages
be executed in order within a partition.

Thanks!






On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger c...@koeninger.org wrote:

 For a given batch, for a given partition, the messages will be processed
 in order by the executor that is running that partition.  That's because
 messages for the given offset range are pulled by the executor, not pushed
 from some other receiver.

 If you have speculative execution, yes, another executor may be running
 that partition.

 If your job is lagging behind in processing such that the next batch
 starts executing before the last batch is finished processing, yes it is
 possible for some other executor to start working on messages from that
 same kafka partition.

 The obvious solution here seems to be turn off speculative execution and
 adjust your batch interval / sizes such that they can comfortably finish
 processing :)

 If your processing time is sufficiently non-linear with regard to the
 number of messages, yes you might be able to do something with overriding
 dstream.compute.  Unfortunately the new kafka dstream implementation is
 private, so it's not straightforward to subclass it.  I'd like to get a
 solution in place for people who need to be able to tune the batch
 generation policy (I need to as well, for unrelated reasons).  Maybe you
 can say a little more about your use case.

 But regardless of the technology you're using to read from kafka (spark,
 storm, whatever), kafka only gives you ordering as to a particular
 partition.  So you're going to need to do some kind of downstream sorting
 if you really care about a global order.

 On Fri, Feb 20, 2015 at 1:43 AM, Neelesh neele...@gmail.com wrote:

 Even with the new direct streams in 1.3,  isn't it the case that the job
 *scheduling* follows the partition order, rather than job *execution*?
 Or is it the case that the stream listens to job completion event (using a
 streamlistener) before scheduling the next batch?  To compare with storm
 from a message ordering point of view, unless a tuple is fully processed by
 the DAG (as defined by spout+bolts), the next tuple does not enter the DAG.


 On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Kafka ordering is guaranteed on a per-partition basis.

 The high-level consumer api as used by the spark kafka streams prior to
 1.3 will consume from multiple kafka partitions, thus not giving any
 ordering guarantees.

 The experimental direct stream in 1.3 uses the simple consumer api,
 and there is a 1:1 correspondence between spark partitions and kafka
 partitions.  So you will get deterministic ordering, but only on a
 per-partition basis.

 On Thu, Feb 19, 2015 at 11:31 PM, Neelesh neele...@gmail.com wrote:

 I had a chance to talk to TD today at the Strata+Hadoop Conf in San
 Jose. We talked a bit about this after his presentation about this - the
 short answer is spark streaming does not guarantee any sort of ordering
 (within batches, across batches).  One would have to use updateStateByKey
 to collect the events and sort them based on some attribute of the event.
 But TD said message ordering is a frequently asked feature recently and is
 getting on his radar.

 I went through the source code and there does not seem to be any
 architectural/design limitation to support this.  (JobScheduler,
 JobGenerator are a good starting point to see how stuff works under the
 hood).  Overriding DStream#compute and using streaminglistener looks like a
 simple way of ensuring ordered execution of batches within a stream. But
 this would be a partial solution, since ordering within a batch needs some
 more work that I don't understand fully 

Re: Spark Streaming and message ordering

2015-02-20 Thread Cody Koeninger
There is typically some slack between when a batch finishes executing and
when the next batch is scheduled.  You should be able to arrange your batch
sizes / cluster resources to ensure that.  If there isn't slack, your
overall delay is going to keep increasing indefinitely.

If you're inserting into mysql, you're probably going to be much better off
doing bulk inserts anyway, and transaction ordering is going to stop a lot
of overlap that might otherwise happen.  In pseudocode:

stream.foreachRdd { rdd =
  rdd.foreachPartition { iter =
 bulk = iter.filter(matchEvent).toList
 transaction { insert bulk }
  }
}

You may already know this, but getting jdbc to do true bulk inserts to
mysql requires a bit of hoop jumping, so turn on query logging during
development to make sure you aren't getting individual inserts.

Also be aware that output actions aren't guaranteed to happen exactly once,
so you'll need to store unique offset ids in mysql or otherwise deal with
the possibility of executor failures.


On Fri, Feb 20, 2015 at 10:39 AM, Neelesh neele...@gmail.com wrote:

 Thanks for the detailed response Cody. Our use case is to  do some
 external lookups (cached and all) for every event, match the event against
 the looked up data, decide whether to write an entry in mysql and write it
 in the order in which the events arrived within a kafka partition.

 We don't need global ordering. Message ordering within a batch can be
 achieved either by waiting for 1.3 to be released (the behavior you
 described works very well for us, within a batch) , or by using
  updateStateByKey and sorting.   speculative execution is turned off as
 well (I think its off by default).

 But, from what I see from the JobScheduler/JobGenerator is this. Within
 each stream, jobs are generated every 'n' milliseconds (batch duration),
 and submitted for execution. Since job generation in a stream is temporal,
 its guaranteed that the jobs are submitted in the order of event arrival
 within a stream. And since we have one stream per kafka partition, this
 translates to sequentially generated batches  sequentially scheduled
 batches within a kafka partition. But since the *execution* of jobs
 itself is in parallel, its probable that back-to-back batches in a stream
 are submitted one after the other , but are executing concurrently. If this
 understanding of mine is correct, it breaks our requirement that messages
 be executed in order within a partition.

 Thanks!






 On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger c...@koeninger.org
 wrote:

 For a given batch, for a given partition, the messages will be processed
 in order by the executor that is running that partition.  That's because
 messages for the given offset range are pulled by the executor, not pushed
 from some other receiver.

 If you have speculative execution, yes, another executor may be running
 that partition.

 If your job is lagging behind in processing such that the next batch
 starts executing before the last batch is finished processing, yes it is
 possible for some other executor to start working on messages from that
 same kafka partition.

 The obvious solution here seems to be turn off speculative execution and
 adjust your batch interval / sizes such that they can comfortably finish
 processing :)

 If your processing time is sufficiently non-linear with regard to the
 number of messages, yes you might be able to do something with overriding
 dstream.compute.  Unfortunately the new kafka dstream implementation is
 private, so it's not straightforward to subclass it.  I'd like to get a
 solution in place for people who need to be able to tune the batch
 generation policy (I need to as well, for unrelated reasons).  Maybe you
 can say a little more about your use case.

 But regardless of the technology you're using to read from kafka (spark,
 storm, whatever), kafka only gives you ordering as to a particular
 partition.  So you're going to need to do some kind of downstream sorting
 if you really care about a global order.

 On Fri, Feb 20, 2015 at 1:43 AM, Neelesh neele...@gmail.com wrote:

 Even with the new direct streams in 1.3,  isn't it the case that the job
 *scheduling* follows the partition order, rather than job *execution*?
 Or is it the case that the stream listens to job completion event (using a
 streamlistener) before scheduling the next batch?  To compare with storm
 from a message ordering point of view, unless a tuple is fully processed by
 the DAG (as defined by spout+bolts), the next tuple does not enter the DAG.


 On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Kafka ordering is guaranteed on a per-partition basis.

 The high-level consumer api as used by the spark kafka streams prior to
 1.3 will consume from multiple kafka partitions, thus not giving any
 ordering guarantees.

 The experimental direct stream in 1.3 uses the simple consumer api,
 and there is a 1:1 correspondence between spark 

Re: GraphX:java.lang.NoSuchMethodError:org.apache.spark.graphx.Graph$.apply

2015-02-20 Thread jwm
Has anyone found a solution to this? I was able to reproduce it  here
http://stackoverflow.com/questions/28576439/getting-nosuchmethoderror-when-setting-up-spark-graphx-graph
  
but I'm unable to resolve it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-java-lang-NoSuchMethodError-org-apache-spark-graphx-Graph-apply-tp19958p21736.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Saving Spark RDD to Avro with spark.api.python.Converter

2015-02-20 Thread daria
Hi!
I am trying to persist RDD in Avro format with spark API. I wonder if
someone has any experience or suggestions.
My converter with example can be viewed  here
https://github.com/daria-sukhareva/spark/commit/2ba7b213572d6ce2056cfc2536b701ae689c7f98
   
and relevant question  here
http://stackoverflow.com/questions/28368694/saving-spark-rdd-to-avro-with-spark-api-python-converter
 
.

Thanks,
Daria 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Spark-RDD-to-Avro-with-spark-api-python-Converter-tp21738.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Performance on Yarn

2015-02-20 Thread lbierman
A bit more context on this issue. From the container logs on the executor 

Given my cluster specs above what would be appropriate parameters to pass
into :
--num-executors --num-cores --executor-memory 

I had tried it with --executor-memory 2500MB

015-02-20 06:50:09,056 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is
running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
container.
Dump of the process-tree for container_1423083596644_0238_01_004160 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
/usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m
-Xmx2400m 
-Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/CoarseGrainedScheduler
8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1
/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
2
/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
|- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
/usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m
-Xmx2400m
-Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Performance on Yarn

2015-02-20 Thread Sandy Ryza
Are you specifying the executor memory, cores, or number of executors
anywhere?  If not, you won't be taking advantage of the full resources on
the cluster.

-Sandy

On Fri, Feb 20, 2015 at 2:41 AM, Sean Owen so...@cloudera.com wrote:

 None of this really points to the problem. These indicate that workers
 died but not why. I'd first go locate executor logs that reveal more
 about what's happening. It sounds like a hard-er type of failure, like
 JVM crash or running out of file handles, or GC thrashing.

 On Fri, Feb 20, 2015 at 4:51 AM, lbierman leebier...@gmail.com wrote:
  I'm a bit new to Spark, but had a question on performance. I suspect a
 lot of
  my issue is due to tuning and parameters. I have a Hive external table on
  this data and to run queries against it runs in minutes
 
  The Job:
  + 40gb of avro events on HDFS (100 million+ avro events)
  + Read in the files from HDFS and dedupe events by key (mapToPair then a
  reduceByKey)
  + RDD returned and persisted (disk and memory)
  + Then passed to a job that take the RDD and mapToPair of new object data
  and then reduceByKey and foreachpartion do work
 
  The issue:
  When I run this on my environment on Yarn this takes 20+ hours. Running
 on
  yarn we see the first stage runs to do build the RDD deduped, but then
 when
  the next stage starts, things fail and data is lost. This results in
 stage 0
  starting over and over and just dragging it out.
 
  Errors I see in the driver logs:
  ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on X:
 remote
  Akka client disassociated
 
  15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
 3.1
  (TID 1335,): FetchFailed(BlockManagerId(3, i, 33958),
 shuffleId=1,
  mapId=162, reduceId=0, message=
  org.apache.spark.shuffle.FetchFailedException: Failed to connect
  toX/X:33958
 
  Also we see this, but I'm suspecting this is because the previous stage
  fails and the next one starts:
  org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
  location for shuffle 1
 
  Cluster:
  5 machines, each 2 core , 8gb machines
 
  Spark-submit command:
   spark-submit --class com.myco.SparkJob \
  --master yarn \
  /tmp/sparkjob.jar \
 
  Any thoughts or where to look or how to start approaching this problem or
  more data points to present.
 
  Thanks..
 
  Code for the job:
   JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
  context.newAPIHadoopRDD(
  context.hadoopConfiguration(),
  AvroKeyInputFormat.class,
  AvroKey.class,
  NullWritable.class
  ).keys())
  .map(event - AnalyticsEvent.newBuilder(event.datum()).build())
  .filter(key - { return
  Optional.ofNullable(key.getStepEventKey()).isPresent(); })
  .mapToPair(event - new Tuple2AnalyticsEvent, Integer(event,
 1))
  .reduceByKey((analyticsEvent1, analyticsEvent2) -
 analyticsEvent1)
  .map(tuple - tuple._1());
 
  events.persist(StorageLevel.MEMORY_AND_DISK_2());
  events.mapToPair(event - {
  return new Tuple2T, RunningAggregates(
  keySelector.select(event),
  new RunningAggregates(
  Optional.ofNullable(event.getVisitors()).orElse(0L),
 
  Optional.ofNullable(event.getImpressions()).orElse(0L),
  Optional.ofNullable(event.getAmount()).orElse(0.0D),
 
  Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D)));
  })
  .reduceByKey((left, right) - { return left.add(right); })
  .foreachpartition(dostuff)
 
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: using a database connection pool to write data into an RDBMS from a Spark application

2015-02-20 Thread Sean Owen
Have a look at spark.yarn.user.classpath.first and
spark.files.userClassPathFirst for a possible way to give your copy of
the libs precedence.

On Fri, Feb 20, 2015 at 5:20 PM, Mohammed Guller moham...@glassbeam.com wrote:
 Sean,
 I know that Class.forName is not required since Java 1.4 :-) It was just a 
 desperate attempt  to make sure that the Postgres driver is getting loaded. 
 Since Class.forName(org.postgresql.Driver) is not throwing an exception, I 
 assume that the driver is available in the classpath. Is that not true?

 I did some more troubleshooting and here is what I found:
 1) The hive libraries used by Spark use BoneCP 0.7.1
 2) When Spark master is started, it initializes BoneCP, which will not load 
 any database driver at that point (that makes sense)
 3) When my application initializes BoneCP, it thinks it is already 
 initialized and does not load the Postgres driver ( this is a known bug in 
 0.7.1). This bug is fixed in BoneCP 0.8.0 release.

 So I linked my app with BoneCP 0.8.0 release, but when I run my app using 
 spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that 
 behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load 
 BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't 
 help.

 Thanks,
 Mohammed


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Friday, February 20, 2015 2:06 AM
 To: Mohammed Guller
 Cc: Kelvin Chu; user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an RDBMS 
 from a Spark application

 Although I don't know if it's related, the Class.forName() method of loading 
 drivers is very old. You should be using DataSource and javax.sql; this has 
 been the usual practice since about Java 1.4.

 Why do you say a different driver is being loaded? that's not the error here.

 Try instantiating the driver directly to test whether it's available in the 
 classpath. Otherwise you would have to check whether the jar exists, the 
 class exists in it, and it's really on your classpath.

 On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com 
 wrote:
 Hi Kelvin,



 Yes. I am creating an uber jar with the Postgres driver included, but
 nevertheless tried both –jars and –driver-classpath flags. It didn’t help.



 Interestingly, I can’t use BoneCP even in the driver program when I
 run my application with spark-submit. I am getting the same exception
 when the application initializes BoneCP before creating SparkContext.
 It looks like Spark is loading a different version of the Postgres
 JDBC driver than the one that I am linking.



 Mohammed



 From: Kelvin Chu [mailto:2dot7kel...@gmail.com]
 Sent: Thursday, February 19, 2015 7:56 PM
 To: Mohammed Guller
 Cc: user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an
 RDBMS from a Spark application



 Hi Mohammed,



 Did you use --jars to specify your jdbc driver when you submitted your job?
 Take a look of this link:
 http://spark.apache.org/docs/1.2.0/submitting-applications.html



 Hope this help!



 Kelvin



 On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller
 moham...@glassbeam.com
 wrote:

 Hi –

 I am trying to use BoneCP (a database connection pooling library) to
 write data from my Spark application to an RDBMS. The database inserts
 are inside a foreachPartition code block. I am getting this exception
 when the code tries to insert data using BoneCP:



 java.sql.SQLException: No suitable driver found for
 jdbc:postgresql://hostname:5432/dbname



 I tried explicitly loading the Postgres driver on the worker nodes by
 adding the following line inside the foreachPartition code block:



 Class.forName(org.postgresql.Driver)



 It didn’t help.



 Has anybody able to get a database connection pool library to work
 with Spark? If you got it working, can you please share the steps?



 Thanks,

 Mohammed





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Setting the number of executors in standalone mode

2015-02-20 Thread Yiannis Gkoufas
Hi Mohammed,

thanks a lot for the reply.
Ok, so from what I understand I cannot control the number of executors per
worker in standalone cluster mode.
Is that correct?

BR

On 20 February 2015 at 17:46, Mohammed Guller moham...@glassbeam.com
wrote:

  SPARK_WORKER_MEMORY=8g

 Will allocate 8GB memory to Spark on each worker node. Nothing to do with
 # of executors.





 Mohammed



 *From:* Yiannis Gkoufas [mailto:johngou...@gmail.com]
 *Sent:* Friday, February 20, 2015 4:55 AM
 *To:* user@spark.apache.org
 *Subject:* Setting the number of executors in standalone mode



 Hi there,



 I try to increase the number of executors per worker in the standalone
 mode and I have failed to achieve that.

 I followed a bit the instructions of this thread:
 http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores



 and did that:

 spark.executor.memory   1g

 SPARK_WORKER_MEMORY=8g



 hoping to get 8 executors per worker but its still 1.

 And the option num-executors is not available in the standalone mode.



 Thanks a lot!



Re: Spark Performance on Yarn

2015-02-20 Thread Sandy Ryza
If that's the error you're hitting, the fix is to boost
spark.yarn.executor.memoryOverhead, which will put some extra room in
between the executor heap sizes and the amount of memory requested for them
from YARN.

-Sandy

On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the executor

 Given my cluster specs above what would be appropriate parameters to pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: Setting the number of executors in standalone mode

2015-02-20 Thread Mohammed Guller
SPARK_WORKER_MEMORY=8g
Will allocate 8GB memory to Spark on each worker node. Nothing to do with # of 
executors.


Mohammed

From: Yiannis Gkoufas [mailto:johngou...@gmail.com]
Sent: Friday, February 20, 2015 4:55 AM
To: user@spark.apache.org
Subject: Setting the number of executors in standalone mode

Hi there,

I try to increase the number of executors per worker in the standalone mode and 
I have failed to achieve that.
I followed a bit the instructions of this thread: 
http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores

and did that:
spark.executor.memory   1g
SPARK_WORKER_MEMORY=8g

hoping to get 8 executors per worker but its still 1.
And the option num-executors is not available in the standalone mode.

Thanks a lot!


PySpark Cassandra forked

2015-02-20 Thread Rumph, Frens Jan
Hi all,

Wanted to let you know I've forked PySpark Cassandra on
https://github.com/TargetHolding/pyspark-cassandra. Unfortunately the
original code didn't work for me and I couldn't figure out how it could
work. But it inspired! so I rewrote the majority of the project.

The rewrite implements full usage of
https://github.com/datastax/spark-cassandra-connector and brings much of
it's goodness to PySpark!

Hope that some of you are able to put this to good use. And feedback, pull
requests, etc. are more than welcome!

Best regards,
Frens Jan


Stopping a Custom Receiver

2015-02-20 Thread pnpritchard
Hi,

I have a use case for creating a DStream from a single file. I have created
a custom receiver that reads the file, calls 'store' with the contents, then
calls 'stop'. However, I'm second guessing if this is the correct approach
due to the spark logs I see.

I always see these logs, and the 'ERROR' and 'WARN' level makes me feel
uneasy:

19:27:21,161 ERROR ReceiverTracker:75 - Deregistered receiver for stream
2: Finished reading file from /etc/tercel/PipelineTemplate.json
19:27:21,221  WARN ReceiverSupervisorImpl:71 - Stopped executor without
error

In some situations (i.e. server instead of laptop), I get this fatal error
(spark shuts down):

19:35:08,213 ERROR DAGSchedulerActorSupervisor:96 - eventProcesserActor
failed; shutting down SparkContext
org.apache.spark.SparkException: Attempted to use BlockRDD[3] at
receiverStream at BootstrapMetadata.scala:26 after its blocks have been
removed!
at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
at 
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:234)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:234)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:233)
...

FYI, BootstrapMetadata.scala:26 is where I've declared the receiverStream.

I am wondering if it is incorrect to call stop within the custom receiver,
and if this is the reason why the blocks are removed.

Thanks,
Nick


P.S. This is the receiver implementation:

class FileReceiver(path: String) extends
Receiver[String](StorageLevel.MEMORY_ONLY) {

  private var source: BufferedSource = null

  def onStart() {
read()
  }

  def onStop(): Unit = {
source.close()
  }

  private def read() {
try {
  source = Source.fromFile(path)
  val content = source.getLines().mkString
  store(content)
  stop(sFinished reading file from $path)
} catch {
  case e: Exception =
stop(sError reading file from $path, e)
}
  }

}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-a-Custom-Receiver-tp21740.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Setting the number of executors in standalone mode

2015-02-20 Thread Mohammed Guller
ASFAIK, in stand-alone mode, each Spark application gets one executor on each 
worker. You could run multiple workers on a machine though.

Mohammed

From: Yiannis Gkoufas [mailto:johngou...@gmail.com]
Sent: Friday, February 20, 2015 9:48 AM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: Setting the number of executors in standalone mode

Hi Mohammed,

thanks a lot for the reply.
Ok, so from what I understand I cannot control the number of executors per 
worker in standalone cluster mode.
Is that correct?

BR

On 20 February 2015 at 17:46, Mohammed Guller 
moham...@glassbeam.commailto:moham...@glassbeam.com wrote:
SPARK_WORKER_MEMORY=8g
Will allocate 8GB memory to Spark on each worker node. Nothing to do with # of 
executors.


Mohammed

From: Yiannis Gkoufas [mailto:johngou...@gmail.commailto:johngou...@gmail.com]
Sent: Friday, February 20, 2015 4:55 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Setting the number of executors in standalone mode

Hi there,

I try to increase the number of executors per worker in the standalone mode and 
I have failed to achieve that.
I followed a bit the instructions of this thread: 
http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores

and did that:
spark.executor.memory   1g
SPARK_WORKER_MEMORY=8g

hoping to get 8 executors per worker but its still 1.
And the option num-executors is not available in the standalone mode.

Thanks a lot!



RE: using a database connection pool to write data into an RDBMS from a Spark application

2015-02-20 Thread Mohammed Guller
SPARK_CLASSPATH has been deprecated since 1.0. In any case, I tired and it 
didn't work since it appends to the classpath. I need something that prepends 
to the classpath.

Mohammed


-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Friday, February 20, 2015 10:08 AM
To: Mohammed Guller
Cc: Kelvin Chu; user@spark.apache.org
Subject: Re: using a database connection pool to write data into an RDBMS from 
a Spark application

Hm, others can correct me if I'm wrong, but is this what SPARK_CLASSPATH is for?

On Fri, Feb 20, 2015 at 6:04 PM, Mohammed Guller moham...@glassbeam.com wrote:
 It looks like spark.files.userClassPathFirst gives precedence to user 
 libraries only on the worker nodes. Is there something similar to achieve the 
 same behavior on the master?

 BTW, I am running Spark in stand-alone mode.

 Mohammed


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Friday, February 20, 2015 9:42 AM
 To: Mohammed Guller
 Cc: Kelvin Chu; user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an 
 RDBMS from a Spark application

 Have a look at spark.yarn.user.classpath.first and 
 spark.files.userClassPathFirst for a possible way to give your copy of the 
 libs precedence.

 On Fri, Feb 20, 2015 at 5:20 PM, Mohammed Guller moham...@glassbeam.com 
 wrote:
 Sean,
 I know that Class.forName is not required since Java 1.4 :-) It was just a 
 desperate attempt  to make sure that the Postgres driver is getting loaded. 
 Since Class.forName(org.postgresql.Driver) is not throwing an exception, I 
 assume that the driver is available in the classpath. Is that not true?

 I did some more troubleshooting and here is what I found:
 1) The hive libraries used by Spark use BoneCP 0.7.1
 2) When Spark master is started, it initializes BoneCP, which will 
 not load any database driver at that point (that makes sense)
 3) When my application initializes BoneCP, it thinks it is already 
 initialized and does not load the Postgres driver ( this is a known bug in 
 0.7.1). This bug is fixed in BoneCP 0.8.0 release.

 So I linked my app with BoneCP 0.8.0 release, but when I run my app using 
 spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that 
 behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load 
 BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't 
 help.

 Thanks,
 Mohammed


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Friday, February 20, 2015 2:06 AM
 To: Mohammed Guller
 Cc: Kelvin Chu; user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an 
 RDBMS from a Spark application

 Although I don't know if it's related, the Class.forName() method of loading 
 drivers is very old. You should be using DataSource and javax.sql; this has 
 been the usual practice since about Java 1.4.

 Why do you say a different driver is being loaded? that's not the error here.

 Try instantiating the driver directly to test whether it's available in the 
 classpath. Otherwise you would have to check whether the jar exists, the 
 class exists in it, and it's really on your classpath.

 On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com 
 wrote:
 Hi Kelvin,



 Yes. I am creating an uber jar with the Postgres driver included, 
 but nevertheless tried both –jars and –driver-classpath flags. It didn’t 
 help.



 Interestingly, I can’t use BoneCP even in the driver program when I 
 run my application with spark-submit. I am getting the same 
 exception when the application initializes BoneCP before creating 
 SparkContext.
 It looks like Spark is loading a different version of the Postgres 
 JDBC driver than the one that I am linking.



 Mohammed



 From: Kelvin Chu [mailto:2dot7kel...@gmail.com]
 Sent: Thursday, February 19, 2015 7:56 PM
 To: Mohammed Guller
 Cc: user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an 
 RDBMS from a Spark application



 Hi Mohammed,



 Did you use --jars to specify your jdbc driver when you submitted your job?
 Take a look of this link:
 http://spark.apache.org/docs/1.2.0/submitting-applications.html



 Hope this help!



 Kelvin



 On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller 
 moham...@glassbeam.com
 wrote:

 Hi –

 I am trying to use BoneCP (a database connection pooling library) to 
 write data from my Spark application to an RDBMS. The database 
 inserts are inside a foreachPartition code block. I am getting this 
 exception when the code tries to insert data using BoneCP:



 java.sql.SQLException: No suitable driver found for 
 jdbc:postgresql://hostname:5432/dbname



 I tried explicitly loading the Postgres driver on the worker nodes 
 by adding the following line inside the foreachPartition code block:



 Class.forName(org.postgresql.Driver)



 It didn’t help.



 Has anybody able to get a database connection 

Spark 1.3 SQL Programming Guide and sql._ / sql.types._

2015-02-20 Thread Denny Lee
Quickly reviewing the latest SQL Programming Guide
https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md
(in github) I had a couple of quick questions:

1) Do we need to instantiate the SparkContext as per
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Within Spark 1.3 the sqlContext is already available so probably do not
need to make this call.

2) Importing org.apache.spark.sql._ should bring in both SQL data types,
struct types, and row
// Import Spark SQL data types and Row.
import org.apache.spark.sql._

Currently with Spark 1.3 RC1, it appears org.apache.spark.sql._ only brings
in row.

scala import org.apache.spark.sql._

import org.apache.spark.sql._


scala val schema =

 |   StructType(

 | schemaString.split( ).map(fieldName = StructField(fieldName,
StringType, true)))

console:25: error: not found: value StructType

 StructType(

But if I also import in org.apache.spark.sql.types_

scala import org.apache.spark.sql.types._

import org.apache.spark.sql.types._


scala val schema =

 |   StructType(

 | schemaString.split( ).map(fieldName = StructField(fieldName,
StringType, true)))

schema: org.apache.spark.sql.types.StructType =
StructType(StructField(DeviceMake,StringType,true),
StructField(Country,StringType,true))

Wondering if this is by design or perhaps a quick documentation / package
update is warranted.


RE: using a database connection pool to write data into an RDBMS from a Spark application

2015-02-20 Thread Mohammed Guller
It looks like spark.files.userClassPathFirst gives precedence to user libraries 
only on the worker nodes. Is there something similar to achieve the same 
behavior on the master? 

BTW, I am running Spark in stand-alone mode.

Mohammed


-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Friday, February 20, 2015 9:42 AM
To: Mohammed Guller
Cc: Kelvin Chu; user@spark.apache.org
Subject: Re: using a database connection pool to write data into an RDBMS from 
a Spark application

Have a look at spark.yarn.user.classpath.first and 
spark.files.userClassPathFirst for a possible way to give your copy of the libs 
precedence.

On Fri, Feb 20, 2015 at 5:20 PM, Mohammed Guller moham...@glassbeam.com wrote:
 Sean,
 I know that Class.forName is not required since Java 1.4 :-) It was just a 
 desperate attempt  to make sure that the Postgres driver is getting loaded. 
 Since Class.forName(org.postgresql.Driver) is not throwing an exception, I 
 assume that the driver is available in the classpath. Is that not true?

 I did some more troubleshooting and here is what I found:
 1) The hive libraries used by Spark use BoneCP 0.7.1
 2) When Spark master is started, it initializes BoneCP, which will not 
 load any database driver at that point (that makes sense)
 3) When my application initializes BoneCP, it thinks it is already 
 initialized and does not load the Postgres driver ( this is a known bug in 
 0.7.1). This bug is fixed in BoneCP 0.8.0 release.

 So I linked my app with BoneCP 0.8.0 release, but when I run my app using 
 spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that 
 behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load 
 BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't 
 help.

 Thanks,
 Mohammed


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Friday, February 20, 2015 2:06 AM
 To: Mohammed Guller
 Cc: Kelvin Chu; user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an 
 RDBMS from a Spark application

 Although I don't know if it's related, the Class.forName() method of loading 
 drivers is very old. You should be using DataSource and javax.sql; this has 
 been the usual practice since about Java 1.4.

 Why do you say a different driver is being loaded? that's not the error here.

 Try instantiating the driver directly to test whether it's available in the 
 classpath. Otherwise you would have to check whether the jar exists, the 
 class exists in it, and it's really on your classpath.

 On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com 
 wrote:
 Hi Kelvin,



 Yes. I am creating an uber jar with the Postgres driver included, but 
 nevertheless tried both –jars and –driver-classpath flags. It didn’t help.



 Interestingly, I can’t use BoneCP even in the driver program when I 
 run my application with spark-submit. I am getting the same exception 
 when the application initializes BoneCP before creating SparkContext.
 It looks like Spark is loading a different version of the Postgres 
 JDBC driver than the one that I am linking.



 Mohammed



 From: Kelvin Chu [mailto:2dot7kel...@gmail.com]
 Sent: Thursday, February 19, 2015 7:56 PM
 To: Mohammed Guller
 Cc: user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an 
 RDBMS from a Spark application



 Hi Mohammed,



 Did you use --jars to specify your jdbc driver when you submitted your job?
 Take a look of this link:
 http://spark.apache.org/docs/1.2.0/submitting-applications.html



 Hope this help!



 Kelvin



 On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller 
 moham...@glassbeam.com
 wrote:

 Hi –

 I am trying to use BoneCP (a database connection pooling library) to 
 write data from my Spark application to an RDBMS. The database 
 inserts are inside a foreachPartition code block. I am getting this 
 exception when the code tries to insert data using BoneCP:



 java.sql.SQLException: No suitable driver found for 
 jdbc:postgresql://hostname:5432/dbname



 I tried explicitly loading the Postgres driver on the worker nodes by 
 adding the following line inside the foreachPartition code block:



 Class.forName(org.postgresql.Driver)



 It didn’t help.



 Has anybody able to get a database connection pool library to work 
 with Spark? If you got it working, can you please share the steps?



 Thanks,

 Mohammed






Re: using a database connection pool to write data into an RDBMS from a Spark application

2015-02-20 Thread Sean Owen
Hm, others can correct me if I'm wrong, but is this what SPARK_CLASSPATH is for?

On Fri, Feb 20, 2015 at 6:04 PM, Mohammed Guller moham...@glassbeam.com wrote:
 It looks like spark.files.userClassPathFirst gives precedence to user 
 libraries only on the worker nodes. Is there something similar to achieve the 
 same behavior on the master?

 BTW, I am running Spark in stand-alone mode.

 Mohammed


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Friday, February 20, 2015 9:42 AM
 To: Mohammed Guller
 Cc: Kelvin Chu; user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an RDBMS 
 from a Spark application

 Have a look at spark.yarn.user.classpath.first and 
 spark.files.userClassPathFirst for a possible way to give your copy of the 
 libs precedence.

 On Fri, Feb 20, 2015 at 5:20 PM, Mohammed Guller moham...@glassbeam.com 
 wrote:
 Sean,
 I know that Class.forName is not required since Java 1.4 :-) It was just a 
 desperate attempt  to make sure that the Postgres driver is getting loaded. 
 Since Class.forName(org.postgresql.Driver) is not throwing an exception, I 
 assume that the driver is available in the classpath. Is that not true?

 I did some more troubleshooting and here is what I found:
 1) The hive libraries used by Spark use BoneCP 0.7.1
 2) When Spark master is started, it initializes BoneCP, which will not
 load any database driver at that point (that makes sense)
 3) When my application initializes BoneCP, it thinks it is already 
 initialized and does not load the Postgres driver ( this is a known bug in 
 0.7.1). This bug is fixed in BoneCP 0.8.0 release.

 So I linked my app with BoneCP 0.8.0 release, but when I run my app using 
 spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that 
 behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load 
 BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't 
 help.

 Thanks,
 Mohammed


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Friday, February 20, 2015 2:06 AM
 To: Mohammed Guller
 Cc: Kelvin Chu; user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an
 RDBMS from a Spark application

 Although I don't know if it's related, the Class.forName() method of loading 
 drivers is very old. You should be using DataSource and javax.sql; this has 
 been the usual practice since about Java 1.4.

 Why do you say a different driver is being loaded? that's not the error here.

 Try instantiating the driver directly to test whether it's available in the 
 classpath. Otherwise you would have to check whether the jar exists, the 
 class exists in it, and it's really on your classpath.

 On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com 
 wrote:
 Hi Kelvin,



 Yes. I am creating an uber jar with the Postgres driver included, but
 nevertheless tried both –jars and –driver-classpath flags. It didn’t help.



 Interestingly, I can’t use BoneCP even in the driver program when I
 run my application with spark-submit. I am getting the same exception
 when the application initializes BoneCP before creating SparkContext.
 It looks like Spark is loading a different version of the Postgres
 JDBC driver than the one that I am linking.



 Mohammed



 From: Kelvin Chu [mailto:2dot7kel...@gmail.com]
 Sent: Thursday, February 19, 2015 7:56 PM
 To: Mohammed Guller
 Cc: user@spark.apache.org
 Subject: Re: using a database connection pool to write data into an
 RDBMS from a Spark application



 Hi Mohammed,



 Did you use --jars to specify your jdbc driver when you submitted your job?
 Take a look of this link:
 http://spark.apache.org/docs/1.2.0/submitting-applications.html



 Hope this help!



 Kelvin



 On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller
 moham...@glassbeam.com
 wrote:

 Hi –

 I am trying to use BoneCP (a database connection pooling library) to
 write data from my Spark application to an RDBMS. The database
 inserts are inside a foreachPartition code block. I am getting this
 exception when the code tries to insert data using BoneCP:



 java.sql.SQLException: No suitable driver found for
 jdbc:postgresql://hostname:5432/dbname



 I tried explicitly loading the Postgres driver on the worker nodes by
 adding the following line inside the foreachPartition code block:



 Class.forName(org.postgresql.Driver)



 It didn’t help.



 Has anybody able to get a database connection pool library to work
 with Spark? If you got it working, can you please share the steps?



 Thanks,

 Mohammed





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Setting the number of executors in standalone mode

2015-02-20 Thread Kelvin Chu
Hi,

Currently, there is only one executor per worker. There is jira ticket to
relax this:

https://issues.apache.org/jira/browse/SPARK-1706

But, if you want to use more cores, maybe, you can try increasing
SPARK_WORKER_INSTANCES. It increases the number of workers per machine.
Take a look here:
http://spark.apache.org/docs/1.2.0/spark-standalone.html

Hope this help!
Kelvin


On Fri, Feb 20, 2015 at 10:08 AM, Mohammed Guller moham...@glassbeam.com
wrote:

  ASFAIK, in stand-alone mode, each Spark application gets one executor on
 each worker. You could run multiple workers on a machine though.



 Mohammed



 *From:* Yiannis Gkoufas [mailto:johngou...@gmail.com]
 *Sent:* Friday, February 20, 2015 9:48 AM
 *To:* Mohammed Guller
 *Cc:* user@spark.apache.org
 *Subject:* Re: Setting the number of executors in standalone mode



 Hi Mohammed,



 thanks a lot for the reply.

 Ok, so from what I understand I cannot control the number of executors per
 worker in standalone cluster mode.

 Is that correct?



 BR



 On 20 February 2015 at 17:46, Mohammed Guller moham...@glassbeam.com
 wrote:

 SPARK_WORKER_MEMORY=8g

 Will allocate 8GB memory to Spark on each worker node. Nothing to do with
 # of executors.





 Mohammed



 *From:* Yiannis Gkoufas [mailto:johngou...@gmail.com]
 *Sent:* Friday, February 20, 2015 4:55 AM
 *To:* user@spark.apache.org
 *Subject:* Setting the number of executors in standalone mode



 Hi there,



 I try to increase the number of executors per worker in the standalone
 mode and I have failed to achieve that.

 I followed a bit the instructions of this thread:
 http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores



 and did that:

 spark.executor.memory   1g

 SPARK_WORKER_MEMORY=8g



 hoping to get 8 executors per worker but its still 1.

 And the option num-executors is not available in the standalone mode.



 Thanks a lot!





Re: output worker stdout to one place

2015-02-20 Thread Marcelo Vanzin
Hi Anny,

You could play with creating your own log4j.properties that will write
the output somewhere else (e.g. to some remote mount, or remote
syslog). Sorry, but I don't have an example handy.

Alternatively, if you can use Yarn, it will collect all logs after the
job is finished and make them available as a single file using the
yarn logs command.


On Fri, Feb 20, 2015 at 11:31 AM, anny9699 anny9...@gmail.com wrote:
 Hi,

 I am wondering if there's some way that could lead some of the worker stdout
 to one place instead of in each worker's stdout. For example, I have the
 following code

 RDD.foreach{line =
 try{
 do something
 }catch{
 case e:exception = println(line)
 }
 }

 Every time I want to check what's causing the exception, I have to check one
 worker after another in the UI, because I don't know which worker will be
 dealing with the exception case. Is there a way that the println could
 print to one place instead of separate worker stdout so that I only need to
 check one place?

 Thanks a lot!
 Anny



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/output-worker-stdout-to-one-place-tp21742.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



output worker stdout to one place

2015-02-20 Thread anny9699
Hi,

I am wondering if there's some way that could lead some of the worker stdout
to one place instead of in each worker's stdout. For example, I have the
following code

RDD.foreach{line =
try{
do something
}catch{
case e:exception = println(line)
}
}

Every time I want to check what's causing the exception, I have to check one
worker after another in the UI, because I don't know which worker will be
dealing with the exception case. Is there a way that the println could
print to one place instead of separate worker stdout so that I only need to
check one place?

Thanks a lot!
Anny



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/output-worker-stdout-to-one-place-tp21742.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Use Spark Streaming for Batch?

2015-02-20 Thread craigv
We have a sophisticated Spark Streaming application that we have been using
successfully in production for over a year to process a time series of
events.  Our application makes novel use of updateStateByKey() for state
management.

We now have the need to perform exactly the same processing on input data
that's not real-time, but has been persisted to disk.  We do not want to
rewrite our Spark Streaming app unless we have to.

/Might it be possible to perform large batches processing on HDFS time
series data using Spark Streaming?/

1.I understand that there is not currently an InputDStream that could do
what's needed.  I would have to create such a thing.
2. Time is a problem.  I would have to use the timestamps on our events for
any time-based logic and state management
3. The batch duration would become meaningless in this scenario.  Could I
just set it to something really small (say 1 second) and then let it fall
behind, processing the data as quickly as it could?

It all seems possible.  But could Spark Streaming work this way?  If I
created a DStream that delivered (say) months of events, could Spark
Streaming effectively process this in a batch fashion?

Any and all comments/ideas welcome!






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Use-Spark-Streaming-for-Batch-tp21745.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread chirag lakhani
I am trying to access a hive table using spark sql but I am having
trouble.  I followed the instructions in a cloudera community board which
stated

1) Import hive jars into the class path

export SPARK_CLASSPATH=$(find
/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/ -name
'*.jar' -print0 | sed 's/\x0/:/g')

2) start the spark shell

spark-shell

3) created a hive context

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

4) then run query

sqlContext.sql(FROM analytics.trainingdatafinal SELECT
*).collect().foreach(println)


When I do this it seems that it cannot find the table in the hive
metastore, I have put all of my cloudera parcels in the partition starting
with /data as opposed to the default location used by cloudera.  Any
suggestions on what can be done?  I am putting the error below


15/02/20 13:43:01 ERROR Hive:
NoSuchObjectException(message:analytics.trainingdatafinal table not found)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106)
at com.sun.proxy.$Proxy24.get_table(Unknown Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
at com.sun.proxy.$Proxy25.getTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:974)
at
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
at org.apache.spark.sql.hive.HiveContext$$anon$2.org
$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
at
org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at 

Re: Spark Performance on Yarn

2015-02-20 Thread Kelvin Chu
Hi Sandy,

I am also doing memory tuning on YARN. Just want to confirm, is it correct
to say:

spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory I
can actually use in my jvm application

If it is not, what is the correct relationship? Any other variables or
config parameters in play? Thanks.

Kelvin

On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 If that's the error you're hitting, the fix is to boost
 spark.yarn.executor.memoryOverhead, which will put some extra room in
 between the executor heap sizes and the amount of memory requested for them
 from YARN.

 -Sandy

 On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the executor

 Given my cluster specs above what would be appropriate parameters to pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=23320,containerID=container_1423083596644_0238_01_004160]
 is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Performance on Yarn

2015-02-20 Thread Sandy Ryza
Hi Kelvin,

spark.executor.memory controls the size of the executor heaps.

spark.yarn.executor.memoryOverhead is the amount of memory to request from
YARN beyond the heap size.  This accounts for the fact that JVMs use some
non-heap memory.

The Spark heap is divided into spark.storage.memoryFraction (default 0.6)
and spark.shuffle.memoryFraction (default 0.2), and the rest is for basic
Spark bookkeeping and anything the user does inside UDFs.

-Sandy



On Fri, Feb 20, 2015 at 11:44 AM, Kelvin Chu 2dot7kel...@gmail.com wrote:

 Hi Sandy,

 I am also doing memory tuning on YARN. Just want to confirm, is it correct
 to say:

 spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory I
 can actually use in my jvm application

 If it is not, what is the correct relationship? Any other variables or
 config parameters in play? Thanks.

 Kelvin

 On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 If that's the error you're hitting, the fix is to boost
 spark.yarn.executor.memoryOverhead, which will put some extra room in
 between the executor heap sizes and the amount of memory requested for them
 from YARN.

 -Sandy

 On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the executor

 Given my cluster specs above what would be appropriate parameters to pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=23320,containerID=container_1423083596644_0238_01_004160]
 is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
 :42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread Sourigna Phetsarath
Chirag,

This worked for us:

spark-submit --master yarn-cluster --driver-class-path
'/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options
'-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*'
...

Let me know, if you have any issues.

On Fri, Feb 20, 2015 at 2:43 PM, chirag lakhani chirag.lakh...@gmail.com
wrote:

 I am trying to access a hive table using spark sql but I am having
 trouble.  I followed the instructions in a cloudera community board which
 stated

 1) Import hive jars into the class path

 export SPARK_CLASSPATH=$(find
 /data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/ -name
 '*.jar' -print0 | sed 's/\x0/:/g')

 2) start the spark shell

 spark-shell

 3) created a hive context

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 4) then run query

 sqlContext.sql(FROM analytics.trainingdatafinal SELECT
 *).collect().foreach(println)


 When I do this it seems that it cannot find the table in the hive
 metastore, I have put all of my cloudera parcels in the partition starting
 with /data as opposed to the default location used by cloudera.  Any
 suggestions on what can be done?  I am putting the error below


 15/02/20 13:43:01 ERROR Hive:
 NoSuchObjectException(message:analytics.trainingdatafinal table not found)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106)
 at com.sun.proxy.$Proxy24.get_table(Unknown Source)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
 at com.sun.proxy.$Proxy25.getTable(Unknown Source)
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000)
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:974)
 at
 org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
 at org.apache.spark.sql.hive.HiveContext$$anon$2.org
 $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 

Re: Spark 1.3 SQL Programming Guide and sql._ / sql.types._

2015-02-20 Thread Michael Armbrust
Yeah, sorry.  The programming guide has not been updated for 1.3.  I'm
hoping to get to that this weekend / next week.

On Fri, Feb 20, 2015 at 9:55 AM, Denny Lee denny.g@gmail.com wrote:

 Quickly reviewing the latest SQL Programming Guide
 https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md
 (in github) I had a couple of quick questions:

 1) Do we need to instantiate the SparkContext as per
 // sc is an existing SparkContext.
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 Within Spark 1.3 the sqlContext is already available so probably do not
 need to make this call.

 2) Importing org.apache.spark.sql._ should bring in both SQL data types,
 struct types, and row
 // Import Spark SQL data types and Row.
 import org.apache.spark.sql._

 Currently with Spark 1.3 RC1, it appears org.apache.spark.sql._ only
 brings in row.

 scala import org.apache.spark.sql._

 import org.apache.spark.sql._


 scala val schema =

  |   StructType(

  | schemaString.split( ).map(fieldName = StructField(fieldName,
 StringType, true)))

 console:25: error: not found: value StructType

  StructType(

 But if I also import in org.apache.spark.sql.types_

 scala import org.apache.spark.sql.types._

 import org.apache.spark.sql.types._


 scala val schema =

  |   StructType(

  | schemaString.split( ).map(fieldName = StructField(fieldName,
 StringType, true)))

 schema: org.apache.spark.sql.types.StructType =
 StructType(StructField(DeviceMake,StringType,true),
 StructField(Country,StringType,true))

 Wondering if this is by design or perhaps a quick documentation / package
 update is warranted.







Re: using hivecontext with sparksql on cdh 5.3

2015-02-20 Thread chirag lakhani
I tried

spark-shell --master yarn-cluster --driver-class-path
'/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'
--driver-java-options
'-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*'

and I get the following error

Error: Cluster deploy mode is not applicable to Spark shells.
Run with --help for usage help or --verbose for debug output



On Fri, Feb 20, 2015 at 2:52 PM, Sourigna Phetsarath 
gna.phetsar...@teamaol.com wrote:

 Chirag,

 This worked for us:

 spark-submit --master yarn-cluster --driver-class-path
 '/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options
 '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*'
 ...

 Let me know, if you have any issues.

 On Fri, Feb 20, 2015 at 2:43 PM, chirag lakhani chirag.lakh...@gmail.com
 wrote:

 I am trying to access a hive table using spark sql but I am having
 trouble.  I followed the instructions in a cloudera community board which
 stated

 1) Import hive jars into the class path

 export SPARK_CLASSPATH=$(find
 /data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/ -name
 '*.jar' -print0 | sed 's/\x0/:/g')

 2) start the spark shell

 spark-shell

 3) created a hive context

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 4) then run query

 sqlContext.sql(FROM analytics.trainingdatafinal SELECT
 *).collect().foreach(println)


 When I do this it seems that it cannot find the table in the hive
 metastore, I have put all of my cloudera parcels in the partition starting
 with /data as opposed to the default location used by cloudera.  Any
 suggestions on what can be done?  I am putting the error below


 15/02/20 13:43:01 ERROR Hive:
 NoSuchObjectException(message:analytics.trainingdatafinal table not found)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106)
 at com.sun.proxy.$Proxy24.get_table(Unknown Source)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
 at com.sun.proxy.$Proxy25.getTable(Unknown Source)
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000)
 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:974)
 at
 org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
 at org.apache.spark.sql.hive.HiveContext$$anon$2.org
 $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at