Re: Persist streams to text files

2014-11-21 Thread Prannoy
Hi ,

You can use FileUtil.copemerge API and specify the path to the folder where
saveAsTextFile is save the part text file.

Suppose your directory is /a/b/c/

use FileUtil.copeMerge(FileSystem of source, a/b/c, FileSystem of
destination, Path to the merged file say (a/b/c.txt), true(to delete the
original dir,null))

Thanks.

On Fri, Nov 21, 2014 at 11:31 AM, Jishnu Prathap [via Apache Spark User
List] ml-node+s1001560n19449...@n3.nabble.com wrote:

  Hi I am also having similar problem.. any fix suggested..



 *Originally Posted by GaganBM*

 Hi,

 I am trying to persist the DStreams to text files. When I use the inbuilt
 API 'saveAsTextFiles' as :

 stream.saveAsTextFiles(resultDirectory)

 this creates a number of subdirectories, for each batch, and within each
 sub directory, it creates bunch of text files for each RDD (I assume).

 I am wondering if I can have single text files for each batch. Is there
 any API for that ? Or else, a single output file for the entire stream ?

 I tried to manually write from each RDD stream to a text file as :

 stream.foreachRDD(rdd ={
   rdd.foreach(element = {
   fileWriter.write(element)
   })
   })

 where 'fileWriter' simply makes use of a Java BufferedWriter to write
 strings to a file. However, this fails with exception :

 DStreamCheckpointData.writeObject used
 java.io.BufferedWriter
 java.io.NotSerializableException: java.io.BufferedWriter
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 .

 Any help on how to proceed with this ?

 The information contained in this electronic message and any attachments
 to this message are intended for the exclusive use of the addressee(s) and
 may contain proprietary, confidential or privileged information. If you are
 not the intended recipient, you should not disseminate, distribute or copy
 this e-mail. Please notify the sender immediately and destroy all copies of
 this message and any attachments.

 WARNING: Computer viruses can be transmitted via email. The recipient
 should check this email and any attachments for the presence of viruses.
 The company accepts no liability for any damage caused by any virus
 transmitted by this email.

 www.wipro.com


 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Re-Persist-streams-to-text-files-tp19449.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Persist-streams-to-text-files-tp19449p19457.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: beeline via spark thrift doesn't retain cache

2014-11-21 Thread Yanbo Liang
1) make sure your beeline client connected to Hiveserver2 of Spark SQL.
You can found execution logs of Hiveserver2 in the environment
of start-thriftserver.sh.
2) what about your scale of data. If cache with small data, it will take
more time to schedule workload between different executors.
Look the configuration of spark execution environment. Whether there are
enough memory for RDD storage, if not, it will take some time to
serialize/deserialize data between memory and disk.

2014-11-21 11:06 GMT+08:00 Judy Nash judyn...@exchange.microsoft.com:

  Hi friends,



 I have successfully setup thrift server and execute beeline on top.



 Beeline can handle select queries just fine, but it cannot seem to do any
 kind of caching/RDD operations.



 i.e.

 1)  Command “cache table” doesn’t work. See error:

 Error: Error while processing statement: FAILED: ParseException line 1:0
 cannot

 recognize input near 'cache' 'table' 'hivesampletable'
 (state=42000,code=4)



 2)  Re-run SQL commands do not have any performance improvements.



 By comparison, Spark-SQL shell can execute “cache table” command and
 rerunning SQL command has a huge performance boost.



 Am I missing something or this is expected when execute through Spark
 thrift server?



 Thanks!

 Judy





RE: Persist streams to text files

2014-11-21 Thread jishnu.prathap
Hi
Thank you ☺Akhil it worked like charm…..
I used the file writer outside rdd.foreach that might be the reason for 
nonserialisable exception….

Thanks  Regards
Jishnu Menath Prathap
From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Friday, November 21, 2014 1:15 PM
To: Jishnu Menath Prathap (WT01 - BAS)
Cc: u...@spark.incubator.apache.org
Subject: Re: Persist streams to text files

Here's a quick version to store (append) in your local machine

val tweets = TwitterUtils.createStream(ssc, None)

val hashTags = tweets.flatMap(status = status.getText.split( 
).filter(_.startsWith(#)))


hashTags.foreachRDD(rdds = {

  rdds.foreach(rdd = {
val fw = new FileWriter(/home/akhld/tags.txt, true)
println(HashTag =  + rdd)
fw.write(rdd + \n)
fw.close()
  })

})

Thanks
Best Regards

On Fri, Nov 21, 2014 at 12:12 PM, 
jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote:
Hi Akhil
Thanks for reply
But it creates different directories ..I tried using filewriter  but it shows 
non serializable error..
val stream = TwitterUtils.createStream(ssc, None) //, filters)

val statuses = stream.map(
  status = sentimentAnalyzer.findSentiment({
status.getText().replaceAll([^A-Za-z0-9 \\#], )

  })
  )

val line = statuses.foreachRDD(
  rdd = {
rdd.foreach(
  tweetWithSentiment = {
if(!tweetWithSentiment.getLine().isEmpty())
println(tweetWithSentiment.getCssClass() +  for line :=   + 
tweetWithSentiment.getLine())//Now I print in console but I need to update it 
to a file in local machine

  })
  })

Thanks  Regards
Jishnu Menath Prathap
From: Akhil Das 
[mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com]
Sent: Friday, November 21, 2014 11:48 AM
To: Jishnu Menath Prathap (WT01 - BAS)
Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: Persist streams to text files


To have a single text file output for each batch you can repartition it to 1 
and then call the saveAsTextFiles

stream.repartition(1).saveAsTextFiles(location)
On 21 Nov 2014 11:28, 
jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote:
Hi I am also having similar problem.. any fix suggested..

Originally Posted by GaganBM
Hi,

I am trying to persist the DStreams to text files. When I use the inbuilt API 
'saveAsTextFiles' as :

stream.saveAsTextFiles(resultDirectory)

this creates a number of subdirectories, for each batch, and within each sub 
directory, it creates bunch of text files for each RDD (I assume).

I am wondering if I can have single text files for each batch. Is there any API 
for that ? Or else, a single output file for the entire stream ?

I tried to manually write from each RDD stream to a text file as :

stream.foreachRDD(rdd ={
  rdd.foreach(element = {
  fileWriter.write(element)
  })
  })

where 'fileWriter' simply makes use of a Java BufferedWriter to write strings 
to a file. However, this fails with exception :

DStreamCheckpointData.writeObject used
java.io.BufferedWriter
java.io.NotSerializableException: java.io.BufferedWriter
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
.

Any help on how to proceed with this ?

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments.

WARNING: Computer viruses can be transmitted via email. The recipient should 
check this email and any attachments for the presence of viruses. The company 
accepts no liability for any damage caused by any virus transmitted by this 
email.

www.wipro.comhttp://www.wipro.com

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments.

WARNING: Computer viruses can be transmitted via email. The recipient should 
check this email and any attachments for the presence of viruses. The company 
accepts no liability for any damage caused by any virus transmitted by this 
email.

www.wipro.comhttp://www.wipro.com


The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged 

Re: processing files

2014-11-21 Thread phiroc
Hi Simon,

no, I don't need to run the tasks on multiple machines for now.

I will therefore stick to Makefile + shell or Java programs as Spark appears 
not to be the right tool for the tasks I am trying to accomplish.

Thanks you for your input.

Philippe



- Mail original -
De: Simon Hafner reactorm...@gmail.com
À: Philippe de Rochambeau phi...@free.fr
Envoyé: Vendredi 21 Novembre 2014 09:47:25
Objet: Re: processing files

2014-11-21 1:46 GMT-06:00 Philippe de Rochambeau phi...@free.fr:
 - reads xml files in thousands of directories, two levels down, from year x 
 to year y

You could try

sc.parallelize(new File(dirWithXML)).flatMap(sc.wholeTextFiles(_))

... not guaranteed to work.

 - extracts data from image tags in those files and stores them in a Sql or 
 NoSql database

From what I understand, spark expects no side effects from the
functions you pass to map(). So that's probably not that good of an
idea if you don't want duplicated records.

 - generates ImageMagick commands based on the extracted data to generate 
 images

data transformation, easy. collect() and save.

 - generates curl commands to index the image files with Solr

same as imagemagick.

 Does Spark provide any tools/features to facilitate and automate (batchify) 
 the above tasks?

Sure, but I wouldn't run the commands with spark. They might be run
twice or more.

 I can do all of the above with one or several Java programs, but I wondered 
 if using Spark would be of any use in such an endeavour.

Personally, I'd use a Makefile, xmlstarlet for the xml parsing, and
store the image paths to plaintext instead of a database, and get
parallelization via -j X. You could also run the imagemagick and curl
commands from there. But that naturally doesn't scale to multiple
machines.

Do you have more than one machine available to run this one? Do you
need to run it on more than one machine, because it takes too long on
just one? That's what spark excels at.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark serialization issues with third-party libraries

2014-11-21 Thread Sean Owen
You are probably casually sending UIMA objects from the driver to
executors in a closure. You'll have to design your program so that you
do not need to ship these objects to or from the remote task workers.

On Fri, Nov 21, 2014 at 8:39 AM, jatinpreet jatinpr...@gmail.com wrote:
 Hi,

 I am planning to use UIMA library to process data in my RDDs. I have had bad
 experiences while using third party libraries inside worker tasks. The
 system gets plagued with Serialization issues. But as UIMA classes are not
 necessarily Serializable, I am not sure if it will work.

 Please explain which classes need to be Serializable and which of them can
 be left as it is? A clear understanding will help me a lot.

 Thanks,
 Jatin



 -
 Novice Big Data Programmer
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-serialization-issues-with-third-party-libraries-tp19454.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)

2014-11-21 Thread Earthson
Is there any way to get the yarn application_id inside the program?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




spark code style

2014-11-21 Thread Kevin Jung
Hi all.
Here are two code snippets.
And they will produce the same result.

1.
rdd.map( function )

2.
rdd.map( function1 ).map( function2 ).map( function3 )

What are the pros and cons of these two methods?

Regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-code-style-tp19463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is there a way to turn on spark eventLog on the worker node?

2014-11-21 Thread Xuelin Cao

Hi,

I'm going to debug some spark applications on our testing platform. And
it would be helpful if we can see the eventLog on the *worker *node. 

I've tried to turn on *spark.eventLog.enabled* and set
*spark.eventLog.dir* parameters on the worker node. However, it doesn't
work.

I do have event logs on my driver node, and I know how to turn it on.
However, the same settings doesn't work on the worker node. 

Can anyone help me to clarify whether event log is only available on
driver node?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-turn-on-spark-eventLog-on-the-worker-node-tp19464.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Determine number of running executors

2014-11-21 Thread Yanbo Liang
You can get parameter such as spark.executor.memory, but you can not get
executor or core numbers.
Because executor and core are parameters of spark deploy environment not
spark context.

val conf = new SparkConf().set(spark.executor.memory,2G)
val sc = new SparkContext(conf)

sc.getConf.get(spark.executor.memory)
conf.get(spark.executor.memory)

2014-11-21 15:35 GMT+08:00 Tobias Pfeiffer t...@preferred.jp:

 Hi,

 when running on YARN, is there a way for the Spark driver to know how many
 executors, cores per executor etc. there are? I want to know this so I can
 repartition to a good number.

 Thanks
 Tobias



short-circuit local reads cannot be used

2014-11-21 Thread Daniel Haviv
Hi,
Everytime I start the spark-shell I encounter this message:
14/11/18 00:27:43 WARN hdfs.BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.

Any idea how to overcome it ?
the short-circuit feature is a big perfomance boost I don't want to lose..

Thanks,
Daniel


Re: spark code style

2014-11-21 Thread Gerard Maas
I suppose that here function(x) = function3(function2(function1(x)))

In that case, the difference will be modularity and readability of your
program.
If function{1,2,3} are logically different steps and potentially reusable
somewhere else, I'd keep them separate.

A sequence of map transformations will be pipelined by Spark with little
overhead.

-kr, Gerard.

On Fri, Nov 21, 2014 at 10:20 AM, Kevin Jung itsjb.j...@samsung.com wrote:

 Hi all.
 Here are two code snippets.
 And they will produce the same result.

 1.
 rdd.map( function )

 2.
 rdd.map( function1 ).map( function2 ).map( function3 )

 What are the pros and cons of these two methods?

 Regards
 Kevin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-code-style-tp19463.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)

2014-11-21 Thread Earthson
Finally, I've found two ways:

1. search the output with something like Submitted application
application_1416319392519_0115
2. use specific AppName. We could query the ApplicationID(yarn)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462p19466.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Another accumulator question

2014-11-21 Thread Sean Owen
This sounds more like a use case for reduce? or fold? it sounds like
you're kind of cobbling together the same function on accumulators,
when reduce/fold are simpler and have the behavior you suggest.

On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
nkronenf...@oculusinfo.com wrote:
 I think I understand what is going on here, but I was hoping someone could
 confirm (or explain reality if I don't) what I'm seeing.

 We are collecting data using a rather sizable accumulator - essentially, an
 array of tens of thousands of entries.  All told, about 1.3m of data.

 If I understand things correctly, it looks to me like, when our job is done,
 a copy of this array is retrieved from each individual task, all at once,
 for combination on the client - which means, with 400 tasks to the job, each
 collection is using up half a gig of memory on the client.

 Is this true?  If so, does anyone know a way to get accumulators to
 accumulate as results collect, rather than all at once at the end, so we
 only have to hold a few in memory at a time, rather than all 400?

 Thanks,
   -Nathan


 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to deal with BigInt in my case class for RDD = SchemaRDD convertion

2014-11-21 Thread Jianshi Huang
Hi,

I got an error during rdd.registerTempTable(...) saying scala.MatchError:
scala.BigInt

Looks like BigInt cannot be used in SchemaRDD, is that correct?

So what would you recommend to deal with it?

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Spark Streaming Metrics

2014-11-21 Thread Gerard Maas
Looks like metrics are not a hot topic to discuss - yet so important to
sleep well when jobs are running in production.

I've created Spark-4537 https://issues.apache.org/jira/browse/SPARK-4537
to track this issue.

-kr, Gerard.

On Thu, Nov 20, 2014 at 9:25 PM, Gerard Maas gerard.m...@gmail.com wrote:

 As the Spark Streaming tuning guide indicates, the key indicators of a
 healthy streaming job are:
 - Processing Time
 - Total Delay

 The Spark UI page for the Streaming job [1] shows these two indicators but
 the metrics source for Spark Streaming (StreamingSource.scala)  [2] does
 not.

 Any reasons for that? I would like to monitor job performance through an
 external monitor (Ganglia in our case) and I've connected already the
 currently published metrics.

 -kr,  Gerard.


 [1]
 https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127

 [2]
 https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala



Re: Why is ALS class serializable ?

2014-11-21 Thread Hao Ren
It makes sense.

Thx. =)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262p19472.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark: Simple local test failed depending on memory settings

2014-11-21 Thread rzykov
Dear all,

We encountered problems of failed jobs with huge amount of data.

A simple local test was prepared for this question at
https://gist.github.com/copy-of-rezo/6a137e13a1e4f841e7eb
It generates 2 sets of key-value pairs, join them, selects distinct values
and counts data finally.

object Spill {
  def generate = {
for{
  j - 1 to 10
  i - 1 to 200
} yield(j, i)
  }
 
  def main(args: Array[String]) {
val conf = new SparkConf().setAppName(getClass.getSimpleName)
conf.set(spark.shuffle.spill, true)
conf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
val sc = new SparkContext(conf)
println(generate)
 
val dataA = sc.parallelize(generate)
val dataB = sc.parallelize(generate)
val dst = dataA.join(dataB).distinct().count()
println(dst)
  }
}

We compiled it locally and run 3 times with different settings of memory:
1) *--executor-memory 10M --driver-memory 10M --num-executors 1
--executor-cores 1*
It fails wtih java.lang.OutOfMemoryError: GC overhead limit exceeded at 
.
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)

2) *--executor-memory 20M --driver-memory 20M --num-executors 1
--executor-cores 1*
It works OK

3)  *--executor-memory 10M --driver-memory 10M --num-executors 1
--executor-cores 1* But let's make less data for i from 200 to 100. It
reduces input data in 2 times and joined data in 4 times

  def generate = {
for{
  j - 1 to 10
  i - 1 to 100   // previous value was 200 
} yield(j, i)
  }
This code works OK. 

We don't understand why 10M is not enough for such simple operation with
32000 bytes of ints (2 * 10 * 200 * 2 * 4) approximately? 10M of RAM works
if we change the data volume in 2 times (2000 of records of (int, int)).  
Why spilling to disk doesn't cover this case? 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Simple-local-test-failed-depending-on-memory-settings-tp19473.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cores on Master

2014-11-21 Thread Prannoy
Hi,

You can also set the cores in the spark application itself .

http://spark.apache.org/docs/1.0.1/spark-standalone.html

On Wed, Nov 19, 2014 at 6:11 AM, Pat Ferrel-2 [via Apache Spark User List] 
ml-node+s1001560n19238...@n3.nabble.com wrote:

 OK hacking the start-slave.sh did it

 On Nov 18, 2014, at 4:12 PM, Pat Ferrel [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=0 wrote:

 This seems to work only on a ‘worker’ not the master? So I’m back to
 having no way to control cores on the master?

 On Nov 18, 2014, at 3:24 PM, Pat Ferrel [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=1 wrote:

 Looks like I can do this by not using start-all.sh but starting each
 worker separately passing in a '--cores n' to the master? No config/env
 way?

 On Nov 18, 2014, at 3:14 PM, Pat Ferrel [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=2 wrote:

 I see the default and max cores settings but these seem to control total
 cores per cluster.

 My cobbled together home cluster needs the Master to not use all its cores
 or it may lock up (it does other things). Is there a way to control max
 cores used for a particular cluster machine in standalone mode?
 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=3
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=4



 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=5
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=6



 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=7
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=8



 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=9
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19238i=10



 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Cores-on-Master-tp19230p19238.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cores-on-Master-tp19230p19475.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Slow performance in spark streaming

2014-11-21 Thread Prannoy
Hi,

Spark runs in local with a speed less than in cluster. Cluster machines
usually have a high configuration and also the tasks are distrubuted in
workers in order to get a faster result. So you will always find a
difference in speed when running in local and when running in cluster. Try
running the same in a cluster and evaluate the speed there.

Thanks

On Thu, Nov 20, 2014 at 6:52 PM, Blackeye [via Apache Spark User List] 
ml-node+s1001560n1937...@n3.nabble.com wrote:

 I am using spark streaming 1.1.0 locally (not in a cluster). I created a
 simple app that parses the data (about 10.000 entries), stores it in a
 stream and then makes some transformations on it. Here is the code:


















































 *def main(args : Array[String]){ val master = local[8] val conf
 = new SparkConf().setAppName(Tester).setMaster(master) val sc = new
 StreamingContext(conf, Milliseconds(11)) val stream =
 sc.receiverStream(new MyReceiver(localhost, )) val parsedStream =
 parse(stream) parsedStream.foreachRDD(rdd =
 println(rdd.first()+\nRULE STARTS +System.currentTimeMillis())) val
 result1 = parsedStream.filter(entry =
 entry.symbol.contains(walking) entry.symbol.contains(true) 
 entry.symbol.contains(id0)).map(_.time) val result2 =
 parsedStream.filter(entry = entry.symbol == disappear 
 entry.symbol.contains(id0)).map(_.time) val result3 = result1
   .transformWith(result2, (rdd1, rdd2: RDD[Int]) =
 rdd1.subtract(rdd2)) result3.foreachRDD(rdd =
 println(rdd.first()+\nRULE ENDS +System.currentTimeMillis()))
  sc.start()sc.awaitTermination() } def parse(stream: DStream[String]) =
 { stream.flatMap { line = val entries =
 line.split(assert).filter(entry = !entry.isEmpty) entries.map {
 tuple = val pattern =
 \s*[(](.+)[,]\s*([0-9]+)+\s*[)]\s*[)]\s*[,|\.]\s*.r tuple
 match {   case pattern(symbol, time) =   new
 Data(symbol, time.toInt) }  } } } case class Data
 (symbol: String, time: Int)*

 I have a batch duration of 110.000 milliseconds in order to receive all
 the data in one batch. I believed that, even locally, the spark is very
 fast. In this case, it takes about 3.5sec to execute the rule (between
 RULE STARTS and RULE ENDS). Am I doing something wrong or this is the
 expected time? Any advise

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Slow-performance-in-spark-streaming-tp19371.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Slow-performance-in-spark-streaming-tp19371p19476.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Parsing a large XML file using Spark

2014-11-21 Thread Prannoy
Hi,

Parallel processing of xml files may be an issue due to the tags in the xml
file. The xml file has to be intact as while parsing it matches the start
and end entity and if its distributed in parts to workers possibly it may
or may not find start and end tags within the same worker which will give
an exception.

Thanks.

On Wed, Nov 19, 2014 at 6:26 AM, ssimanta [via Apache Spark User List] 
ml-node+s1001560n19239...@n3.nabble.com wrote:

 If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump
 that all revision information also) that is stored in HDFS, is it possible
 to parse it in parallel/faster using Spark? Or do we have to use something
 like a PullParser or Iteratee?

 My current solution is to read the single XML file in the first pass -
 write it to HDFS and then read the small files in parallel on the Spark
 workers.

 Thanks
 -Soumya





 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239p19477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How can I read this avro file using spark scala?

2014-11-21 Thread thomas j
Thanks for the pointer Michael.

I've downloaded spark 1.2.0 from
https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and
built the spark-avro repo you linked to.

When I run it against the example avro file linked to in the documentation
it works. However, when I try to load my avro file (linked to in my
original question) I receive the following error:

java.lang.RuntimeException: Unsupported type LONG
at scala.sys.package$.error(package.scala:27)
at com.databricks.spark.avro.AvroRelation.com
$databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
at
com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
at
com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
...

If this is useful I'm happy to try loading the various different avro files
I have to try to battle-test spark-avro.

Thanks

On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust mich...@databricks.com
wrote:

 One option (starting with Spark 1.2, which is currently in preview) is to
 use the Avro library for Spark SQL.  This is very new, but we would love to
 get feedback: https://github.com/databricks/spark-avro

 On Thu, Nov 20, 2014 at 10:19 AM, al b beanb...@googlemail.com wrote:

 I've read several posts of people struggling to read avro in spark. The
 examples I've tried don't work. When I try this solution (
 https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files)
 I get errors:

 spark java.io.NotSerializableException:
 org.apache.avro.mapred.AvroWrapper

 How can I read the following sample file in spark using scala?

 http://www.4shared.com/file/SxnYcdgJce/sample.html

 Thomas





Re: How can I read this avro file using spark scala?

2014-11-21 Thread thomas j
I've been able to load a different avro file based on GenericRecord with:

val person = sqlContext.avroFile(/tmp/person.avro)

When I try to call `first()` on it, I get NotSerializableException
exceptions again:

person.first()

...
14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
20)
java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
...

Apart from this I want to transform the records into pairs of (user_id,
record). I can do this by specifying the offset of the user_id column with
something like this:

person.map(r = (r.getInt(2), r)).take(4).collect()

Is there any way to be able to specify the column name (user_id) instead
of needing to know/calculate the offset somehow?

Thanks again


On Fri, Nov 21, 2014 at 11:48 AM, thomas j beanb...@googlemail.com wrote:

 Thanks for the pointer Michael.

 I've downloaded spark 1.2.0 from
 https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and
 built the spark-avro repo you linked to.

 When I run it against the example avro file linked to in the documentation
 it works. However, when I try to load my avro file (linked to in my
 original question) I receive the following error:

 java.lang.RuntimeException: Unsupported type LONG
 at scala.sys.package$.error(package.scala:27)
 at com.databricks.spark.avro.AvroRelation.com
 $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
 at
 com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
 at
 com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 ...

 If this is useful I'm happy to try loading the various different avro
 files I have to try to battle-test spark-avro.

 Thanks

 On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust mich...@databricks.com
 wrote:

 One option (starting with Spark 1.2, which is currently in preview) is to
 use the Avro library for Spark SQL.  This is very new, but we would love to
 get feedback: https://github.com/databricks/spark-avro

 On Thu, Nov 20, 2014 at 10:19 AM, al b beanb...@googlemail.com wrote:

 I've read several posts of people struggling to read avro in spark. The
 examples I've tried don't work. When I try this solution (
 https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files)
 I get errors:

 spark java.io.NotSerializableException:
 org.apache.avro.mapred.AvroWrapper

 How can I read the following sample file in spark using scala?

 http://www.4shared.com/file/SxnYcdgJce/sample.html

 Thomas






RE: tableau spark sql cassandra

2014-11-21 Thread jererc
Hi!

Sure, I'll post the info I grabbed once the cassandra tables values appear
in Tableau.

Best,
Jerome



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282p19480.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Setup Remote HDFS for Spark

2014-11-21 Thread EH
Hi,

Are there any way that I can setup a remote HDFS for Spark (more specific,
for Spark Streaming checkpoints)?  The reason I'm asking is that our Spark
and HDFS do not run on the same machines.  I've been looked around but still
no clue so far.

Thanks,
EH



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Execute Spark programs from local machine on Yarn-hadoop cluster

2014-11-21 Thread Prannoy
Hi naveen,

I dont think this is possible. If you are setting the master with your
cluster details you cannot execute any job from your local machine. You
have to execute the jobs inside your yarn machine so that sparkconf is able
to connect with all the provided details.

If this is not the case such give a detail explaintation of what exactly
you are trying to do :)

Thanks.

On Fri, Nov 21, 2014 at 8:11 PM, Naveen Kumar Pokala [via Apache Spark User
List] ml-node+s1001560n19482...@n3.nabble.com wrote:

 Hi,



 I am executing my spark jobs on yarn cluster by forming conf object in the
 following way.



 SparkConf conf = *new* SparkConf().setAppName(NewJob).setMaster(
 yarn-cluster);



 Now I want to execute spark jobs from my local machine how to do that.



 What I mean is there a way to give IP address, port all the details to
 connect a master(YARN) on some other network from my local spark Program.



 -Naveen


 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Execute-Spark-programs-from-local-machine-on-Yarn-hadoop-cluster-tp19482.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Execute-Spark-programs-from-local-machine-on-Yarn-hadoop-cluster-tp19482p19484.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Setup Remote HDFS for Spark

2014-11-21 Thread EH
Unfortunately whether it is possible to have both Spark and HDFS running on
the same machine is not under our control.  :(  Right now we have Spark and
HDFS running in different machines.  In this case, is it still possible to
hook up a remote HDFS with Spark so that we can use Spark Streaming
checkpoints?  Thank you for your help.

Best,
EH



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481p19485.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD data checkpoint cleaning

2014-11-21 Thread Luis Ángel Vicente Sánchez
I have seen the same behaviour while testing the latest spark 1.2.0
snapshot.

I'm trying the ReliableKafkaReceiver and it works quite well but the
checkpoints folder is always increasing in size. The receivedMetaData
folder remains almost constant in size but the receivedData folder is
always increasing in size even if I set spark.cleaner.ttl to 300 seconds.

Regards,

Luis

2014-09-23 22:47 GMT+01:00 RodrigoB rodrigo.boav...@aspect.com:

 Just a follow-up.

 Just to make sure about the RDDs not being cleaned up, I just replayed the
 app both on the windows remote laptop and then on the linux machine and at
 the same time was observing the RDD folders in HDFS.

 Confirming the observed behavior: running on the laptop I could see the
 RDDs
 continuously increasing. When I ran on linux, only two RDD folders were
 there and continuously being recycled.

 Metadata checkpoints were being cleaned on both scenarios.

 tnks,
 Rod




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14939.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to deal with BigInt in my case class for RDD = SchemaRDD convertion

2014-11-21 Thread Yin Huai
Hello Jianshi,

The reason of that error is that we do not have a Spark SQL data type for
Scala BigInt. You can use Decimal for your case.

Thanks,

Yin

On Fri, Nov 21, 2014 at 5:11 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I got an error during rdd.registerTempTable(...) saying scala.MatchError:
 scala.BigInt

 Looks like BigInt cannot be used in SchemaRDD, is that correct?

 So what would you recommend to deal with it?

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Lots of small input files

2014-11-21 Thread Pat Ferrel
I have a job that searches for input recursively and creates a string of 
pathnames to treat as one input. 

The files are part-x files and they are fairly small. The job seems to take 
a long time to complete considering the size of the total data (150m) and only 
runs on the master machine. The job only does rdd.map type operations.

1) Why doesn’t it use the other workers in the cluster?
2) Is there a downside to using a lot of small part files? Should I coalesce 
them into one input file?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How can I read this avro file using spark scala?

2014-11-21 Thread Simone Franzini
I have also been struggling with reading avro. Very glad to hear that there
is a new avro library coming in Spark 1.2 (which by the way, seems to have
a lot of other very useful improvements).

In the meanwhile, I have been able to piece together several snippets/tips
that I found from various sources and I am now able to read/write avro
correctly. From my understanding, you basically need 3 pieces:
1. Use the kryo serializer.
2. Register your avro classes. I have done this using twitter chill 0.4.0.
3. Read/write avro with a snippet of code like the one you posted.

Here is relevant code (hopefully all of it).

// All of the following are needed in order to read/write AVRO files
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.{ FileSystem, Path }
// Uncomment the following line if you want to use generic AVRO, I am using
specific
//import org.apache.avro.generic.GenericData
import org.apache.avro.Schema
import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat,
AvroKeyOutputFormat }
import org.apache.avro.mapred.AvroKey
// Kryo/avro serialization stuff
import com.esotericsoftware.kryo.Kryo
import com.twitter.chill.avro.AvroSerializer
import org.apache.spark.serializer.{ KryoSerializer, KryoRegistrator }

object MyApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(MyApp).setMaster(local[*])
.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.kryo.registrator, MyRegistrator)

}

// Read
val readJob = new Job()
AvroJob.setInputKeySchema(readJob, schema)
sc.newAPIHadoopFile(inputPath,
classOf[AvroKeyInputFormat[MyAvroClass]],
classOf[AvroKey[MyAvroClass]],
classOf[NullWritable],
readJob.getConfiguration)
.map { t = t._1.datum }

// Write
val rddAvroWritable = rdd.map { s = (new AvroKey(s), NullWritable.get) }
val writeJob = new Job()
AvroJob.setOutputKeySchema(writeJob, schema)

writeJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[MyAvroClass]])
rddAvroWritable.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[MyAvroClass]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[MyAvroClass]],
writeJob.getConfiguration)

}
}


class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
// Put a line like the following for each of your Avro classes if you use
specific Avro
// If you use generic Avro, chill also has a function for that:
GenericRecordSerializer
kryo.register(classOf[MyAvroClass],
AvroSerializer.SpecificRecordBinarySerializer[MyAvroClass])
}
}

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Fri, Nov 21, 2014 at 7:04 AM, thomas j beanb...@googlemail.com wrote:

 I've been able to load a different avro file based on GenericRecord with:

 val person = sqlContext.avroFile(/tmp/person.avro)

 When I try to call `first()` on it, I get NotSerializableException
 exceptions again:

 person.first()

 ...
 14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
 20)
 java.io.NotSerializableException:
 org.apache.avro.generic.GenericData$Record
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 ...

 Apart from this I want to transform the records into pairs of (user_id,
 record). I can do this by specifying the offset of the user_id column with
 something like this:

 person.map(r = (r.getInt(2), r)).take(4).collect()

 Is there any way to be able to specify the column name (user_id) instead
 of needing to know/calculate the offset somehow?

 Thanks again


 On Fri, Nov 21, 2014 at 11:48 AM, thomas j beanb...@googlemail.com
 wrote:

 Thanks for the pointer Michael.

 I've downloaded spark 1.2.0 from
 https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and
 built the spark-avro repo you linked to.

 When I run it against the example avro file linked to in the
 documentation it works. However, when I try to load my avro file (linked to
 in my original question) I receive the following error:

 java.lang.RuntimeException: Unsupported type LONG
 at scala.sys.package$.error(package.scala:27)
 at com.databricks.spark.avro.AvroRelation.com
 $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
 at
 com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
 at
 com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
 at
 

Re: Nightly releases

2014-11-21 Thread Arun Ahuja
Great - what can we do to make this happen?  So should I file a JIRA to
track?

Thanks,

Arun

On Tue, Nov 18, 2014 at 11:46 AM, Andrew Ash and...@andrewash.com wrote:

 I can see this being valuable for users wanting to live on the cutting
 edge without building CI infrastructure themselves, myself included.  I
 think Patrick's recent work on the build scripts for 1.2.0 will make
 delivering nightly builds to a public maven repo easier.

 On Tue, Nov 18, 2014 at 10:22 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Of course we can run this as well to get the lastest, but the build is
 fairly long and this seems like a resource many would need.

 On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Are nightly releases posted anywhere?  There are quite a few vital
 bugfixes and performance improvements being commited to Spark and using the
 latest commits is useful (or even necessary for some jobs).

 Is there a place to post them, it doesn't seem like it would diffcult to
 run make-dist nightly and place it somwhere?

 Is is possible extract this from Jenkins bulds?

 Thanks,
 Arun
  ​






Re: Another accumulator question

2014-11-21 Thread Nathan Kronenfeld
We've done this with reduce - that definitely works.

I've reworked the logic to use accumulators because, when it works, it's
5-10x faster

On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen so...@cloudera.com wrote:

 This sounds more like a use case for reduce? or fold? it sounds like
 you're kind of cobbling together the same function on accumulators,
 when reduce/fold are simpler and have the behavior you suggest.

 On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
 nkronenf...@oculusinfo.com wrote:
  I think I understand what is going on here, but I was hoping someone
 could
  confirm (or explain reality if I don't) what I'm seeing.
 
  We are collecting data using a rather sizable accumulator - essentially,
 an
  array of tens of thousands of entries.  All told, about 1.3m of data.
 
  If I understand things correctly, it looks to me like, when our job is
 done,
  a copy of this array is retrieved from each individual task, all at once,
  for combination on the client - which means, with 400 tasks to the job,
 each
  collection is using up half a gig of memory on the client.
 
  Is this true?  If so, does anyone know a way to get accumulators to
  accumulate as results collect, rather than all at once at the end, so we
  only have to hold a few in memory at a time, rather than all 400?
 
  Thanks,
-Nathan
 
 
  --
  Nathan Kronenfeld
  Senior Visualization Developer
  Oculus Info Inc
  2 Berkeley Street, Suite 600,
  Toronto, Ontario M5A 4J5
  Phone:  +1-416-203-3003 x 238
  Email:  nkronenf...@oculusinfo.com




-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Many retries for Python job

2014-11-21 Thread Brett Meyer
I¹m running a Python script with spark-submit on top of YARN on an EMR
cluster with 30 nodes.  The script reads in approximately 3.9 TB of data
from S3, and then does some transformations and filtering, followed by some
aggregate counts.  During Stage 2 of the job, everything looks to complete
just fine with no executor failures or resubmissions, but when Stage 3
starts up, many Stage 2 tasks have to be rerun due to FetchFailure errors.
Actually, I usually see at least 3-4 retries on Stage 2 before Stage 3 can
successfully start.  The whole application eventually completes, but there
is an addition of about 1+ hour overhead for all of the retries.

I¹m trying to determine why there were FetchFailure exceptions, since
anything computed in the job that could not fit in the available memory
cache should be by default spilled to disk for further retrieval.  I also
see some java.net.ConnectException: Connection refused² and
java.io.IOException: sendMessageReliably failed without being ACK¹d errors
in the logs after a CancelledKeyException followed by a
ClosedChannelException, but I have no idea why the nodes in the EMR cluster
would suddenly stop being able to communicate.

If anyone has ideas as to why the data needs to be rerun several times in
this job, please let me know as I am fairly bewildered about this behavior.




smime.p7s
Description: S/MIME cryptographic signature


Re: Nightly releases

2014-11-21 Thread Arun Ahuja
Great - posted here https://issues.apache.org/jira/browse/SPARK-4542

On Fri, Nov 21, 2014 at 1:03 PM, Andrew Ash and...@andrewash.com wrote:

 Yes you should file a Jira and echo it out here so others can follow and
 comment on it.  Thanks Arun!

 On Fri, Nov 21, 2014 at 12:02 PM, Arun Ahuja aahuj...@gmail.com wrote:

 Great - what can we do to make this happen?  So should I file a JIRA to
 track?

 Thanks,

 Arun

 On Tue, Nov 18, 2014 at 11:46 AM, Andrew Ash and...@andrewash.com
 wrote:

 I can see this being valuable for users wanting to live on the cutting
 edge without building CI infrastructure themselves, myself included.  I
 think Patrick's recent work on the build scripts for 1.2.0 will make
 delivering nightly builds to a public maven repo easier.

 On Tue, Nov 18, 2014 at 10:22 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Of course we can run this as well to get the lastest, but the build is
 fairly long and this seems like a resource many would need.

 On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja aahuj...@gmail.com
 wrote:

 Are nightly releases posted anywhere?  There are quite a few vital
 bugfixes and performance improvements being commited to Spark and using 
 the
 latest commits is useful (or even necessary for some jobs).

 Is there a place to post them, it doesn't seem like it would diffcult
 to run make-dist nightly and place it somwhere?

 Is is possible extract this from Jenkins bulds?

 Thanks,
 Arun
  ​








SparkSQL Timestamp query failure

2014-11-21 Thread whitebread
Hi all,

I put some log files into sql tables through Spark and my schema looks like
this:

 |-- timestamp: timestamp (nullable = true)
 |-- c_ip: string (nullable = true)
 |-- cs_username: string (nullable = true)
 |-- s_ip: string (nullable = true)
 |-- s_port: string (nullable = true)
 |-- cs_method: string (nullable = true)
 |-- cs_uri_stem: string (nullable = true)
 |-- cs_query: string (nullable = true)
 |-- sc_status: integer (nullable = false)
 |-- sc_bytes: integer (nullable = false)
 |-- cs_bytes: integer (nullable = false)
 |-- time_taken: integer (nullable = false)
 |-- User_Agent: string (nullable = true)
 |-- Referrer: string (nullable = true)

As you can notice I created a timestamp field which I read is supported by
Spark (Date wouldn't work as far as I understood). I would love to use for
queries like where timestamp(2012-10-08 16:10:36.0) but when I run it I
keep getting errors.
I tried these 2 following sintax forms:
For the second one I parse a string so Im sure Im actually pass it in a
timestamp format.
I use 2 functions: /parse/ and  /date2timestamp/.

*Any hint on how I should handle timestamp values?* 

Thanks,

Alessandro

1)
scala sqlContext.sql(SELECT * FROM Logs as l where l.timestamp=(2012-10-08
16:10:36.0)).collect
java.lang.RuntimeException: [1.55] failure: ``)'' expected but 16 found

SELECT * FROM Logs as l where l.timestamp=(2012-10-08 16:10:36.0)
  ^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:21)
at $iwC$$iwC$$iwC$$iwC.init(console:26)
at $iwC$$iwC$$iwC.init(console:28)
at $iwC$$iwC.init(console:30)
at $iwC.init(console:32)
at init(console:34)
at .init(console:38)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

2)
sqlContext.sql(SELECT * FROM Logs as l where
l.timestamp=+date2timestamp(formatTime3.parse(2012-10-08
16:10:36.0))).collect
java.lang.RuntimeException: [1.54] failure: ``UNION'' expected but 16 found

SELECT * FROM Logs as l where l.timestamp=2012-10-08 16:10:36.0
 ^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at 

Re: Many retries for Python job

2014-11-21 Thread Sandy Ryza
Hi Brett,

Are you noticing executors dying?  Are you able to check the YARN
NodeManager logs and see whether YARN is killing them for exceeding memory
limits?

-Sandy

On Fri, Nov 21, 2014 at 9:47 AM, Brett Meyer brett.me...@crowdstrike.com
wrote:

 I’m running a Python script with spark-submit on top of YARN on an EMR
 cluster with 30 nodes.  The script reads in approximately 3.9 TB of data
 from S3, and then does some transformations and filtering, followed by some
 aggregate counts.  During Stage 2 of the job, everything looks to complete
 just fine with no executor failures or resubmissions, but when Stage 3
 starts up, many Stage 2 tasks have to be rerun due to FetchFailure errors.
 Actually, I usually see at least 3-4 retries on Stage 2 before Stage 3 can
 successfully start.  The whole application eventually completes, but there
 is an addition of about 1+ hour overhead for all of the retries.

 I’m trying to determine why there were FetchFailure exceptions, since
 anything computed in the job that could not fit in the available memory
 cache should be by default spilled to disk for further retrieval.  I also
 see some java.net.ConnectException: Connection refused” and
 java.io.IOException: sendMessageReliably failed without being ACK’d
 errors in the logs after a CancelledKeyException followed by
 a ClosedChannelException, but I have no idea why the nodes in the EMR
 cluster would suddenly stop being able to communicate.

 If anyone has ideas as to why the data needs to be rerun several times in
 this job, please let me know as I am fairly bewildered about this behavior.



Re: Parsing a large XML file using Spark

2014-11-21 Thread Paul Brown
Unfortunately, unless you impose restrictions on the XML file (e.g., where
namespaces are declared, whether entity replacement is used, etc.), you
really can't parse only a piece of it even if you have start/end elements
grouped together.  If you want to deal effectively (and scalably) with
large XML files consisting of many records, the right thing to do is to
write them as one XML document per line just like the one JSON document per
line, at which point the data can be split effectively.  Something like
Woodstox and a little custom code should make an effective pre-processor.

Once you have the line-delimited XML, you can shred it however you want:
 JAXB, Jackson XML, etc.

—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/

On Fri, Nov 21, 2014 at 3:38 AM, Prannoy pran...@sigmoidanalytics.com
wrote:

 Hi,

 Parallel processing of xml files may be an issue due to the tags in the
 xml file. The xml file has to be intact as while parsing it matches the
 start and end entity and if its distributed in parts to workers possibly it
 may or may not find start and end tags within the same worker which will
 give an exception.

 Thanks.

 On Wed, Nov 19, 2014 at 6:26 AM, ssimanta [via Apache Spark User List] 
 [hidden
 email] http://user/SendEmail.jtp?type=nodenode=19477i=0 wrote:

 If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump
 that all revision information also) that is stored in HDFS, is it possible
 to parse it in parallel/faster using Spark? Or do we have to use something
 like a PullParser or Iteratee?

 My current solution is to read the single XML file in the first pass -
 write it to HDFS and then read the small files in parallel on the Spark
 workers.

 Thanks
 -Soumya





 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19477i=1
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
 View this message in context: Re: Parsing a large XML file using Spark
 http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239p19477.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Determine number of running executors

2014-11-21 Thread Sandy Ryza
Hi Tobias,

One way to find out the number of executors is through
SparkContext#getExecutorMemoryStatus.  You can find out the number of by
asking the SparkConf for the spark.executor.cores property, which, if not
set, means 1 for YARN.

-Sandy


On Fri, Nov 21, 2014 at 1:30 AM, Yanbo Liang yanboha...@gmail.com wrote:

 You can get parameter such as spark.executor.memory, but you can not get
 executor or core numbers.
 Because executor and core are parameters of spark deploy environment not
 spark context.

 val conf = new SparkConf().set(spark.executor.memory,2G)
 val sc = new SparkContext(conf)

 sc.getConf.get(spark.executor.memory)
 conf.get(spark.executor.memory)

 2014-11-21 15:35 GMT+08:00 Tobias Pfeiffer t...@preferred.jp:

 Hi,

 when running on YARN, is there a way for the Spark driver to know how
 many executors, cores per executor etc. there are? I want to know this so I
 can repartition to a good number.

 Thanks
 Tobias





Re: Parsing a large XML file using Spark

2014-11-21 Thread andy petrella
Actually, it's a real

On Tue Nov 18 2014 at 2:52:00 AM Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for
 one solution.

 One issue with those XML files is that they cannot be processed line by
 line in parallel; plus you inherently need shared/global state to parse XML
 or check for well-formedness, I think. (Same issue with multi-line JSON, by
 the way.)

 Tobias




Re: Extracting values from a Collecion

2014-11-21 Thread Sanjay Subramanian
I am sorry the last line in the code is 
file1Rdd.join(file2RddGrp.mapValues(names = 
names.toSet)).collect().foreach(println)
so 
My Code===val file1Rdd = 
sc.textFile(/Users/sansub01/mycode/data/songs/names.txt).map(x = 
(x.split(,)(0), x.split(,)(1)))val file2Rdd = 
sc.textFile(/Users/sansub01/mycode/data/songs/songs.txt).map(x = 
(x.split(,)(0), x.split(,)(1)))val file2RddGrp = 
file2Rdd.groupByKey()file1Rdd.join(file2RddGrp.mapValues(names = 
names.toSet)).collect().foreach(println)
Result===(4,(ringo,Set(With a Little Help From My Friends, Octopus's 
Garden)))(2,(john,Set(Julia, Nowhere Man)))(3,(george,Set(While My Guitar 
Gently Weeps, Norwegian Wood)))(1,(paul,Set(Yesterday, Michelle)))
Again the question is how do I extract values from the Set ?
thanks
sanjay  From: Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID
 To: Arun Ahuja aahuj...@gmail.com; Andrew Ash and...@andrewash.com 
Cc: user user@spark.apache.org 
 Sent: Friday, November 21, 2014 10:41 AM
 Subject: Extracting values from a Collecion
   
hey guys
names.txt= 1,paul2,john3,george4,ringo 

songs.txt= 1,Yesterday2,Julia3,While My Guitar Gently Weeps4,With a 
Little Help From My Friends1,Michelle2,Nowhere Man3,Norwegian Wood4,Octopus's 
Garden
What I want to do is real simple 
Desired Output ==(4,(With a Little Help From My Friends, Octopus's 
Garden))(2,(Julia, Nowhere Man))(3,(While My Guitar Gently Weeps, Norwegian 
Wood))(1,(Yesterday, Michelle))

My Code===val file1Rdd = 
sc.textFile(/Users/sansub01/mycode/data/songs/names.txt).map(x = 
(x.split(,)(0), x.split(,)(1)))val file2Rdd = 
sc.textFile(/Users/sansub01/mycode/data/songs/songs.txt).map(x = 
(x.split(,)(0), x.split(,)(1)))val file2RddGrp = 
file2Rdd.groupByKey()file2Rdd.groupByKey().mapValues(names = 
names.toSet).collect().foreach(println)

Result===(4,Set(With a Little Help From My Friends, Octopus's 
Garden))(2,Set(Julia, Nowhere Man))(3,Set(While My Guitar Gently Weeps, 
Norwegian Wood))(1,Set(Yesterday, Michelle))

How can I extract values from the Set ?


Thanks
sanjay


  

Re: Parsing a large XML file using Spark

2014-11-21 Thread andy petrella
(sorry about the previous spam... google inbox didn't allowed me to cancel
the miserable sent action :-/)

So what I was about to say: it's a real PAIN tin the ass to parse the
wikipedia articles in the dump due to this mulitline articles...

However, there is a way to manage that quite easily, although I found it
rather slow.

*1/ use XML reader*
Use the org.apache.hadoop % hadoop-streaming % 1.0.4

*2/ configure the hadoop job*
import org.apache.hadoop.streaming.StreamXmlRecordReader
import org.apache.hadoop.mapred.JobConf
val jobConf = new JobConf()
jobConf.set(stream.recordreader.class,
org.apache.hadoop.streaming.StreamXmlRecordReader)
jobConf.set(stream.recordreader.begin, page)
jobConf.set(stream.recordreader.end, /page)
org.apache.hadoop.mapred.FileInputFormat.addInputPaths(jobConf,
shdfs://$master:9000/data.xml)

// Load documents (one per line).
val documents = sparkContext.hadoopRDD(jobConf,
classOf[org.apache.hadoop.streaming.StreamInputFormat],
classOf[org.apache.hadoop.io.Text],
classOf[org.apache.hadoop.io.Text])


*3/ use the result as XML doc*
import scala.xml.XML
val texts = documents.map(_._1.toString)
 .map{ s =
   val xml = XML.loadString(s)
   val id = (xml \ id).text.toDouble
   val title = (xml \ title).text
   val text = (xml \ revision \
text).text.replaceAll(\\W,  )
   val tknzed = text.split(\\W).filter(_.size 
3).toList
   (id, title, tknzed )
 }

HTH
andy
On Tue Nov 18 2014 at 2:52:00 AM Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for
 one solution.

 One issue with those XML files is that they cannot be processed line by
 line in parallel; plus you inherently need shared/global state to parse XML
 or check for well-formedness, I think. (Same issue with multi-line JSON, by
 the way.)

 Tobias




Re: JVM Memory Woes

2014-11-21 Thread Peter Thai
Quick update: 
It is a filter job that creates the error above, not the reduceByKey

Why would a filter cause an out of memory? 

Here is my code

val inputgsup
=hdfs://+sparkmasterip+/user/sense/datasets/gsup/binary/30/2014/11/0[1-9]/part*;
val gsupfile =
sc.newAPIHadoopFile[BytesWritable,BytesWritable,SequenceFileAsBinaryInputFormat](inputgsup)
val gsup = gsupfile.map(x = (GsupHandler.DeserializeKey( x._1.getBytes
),GsupHandler.DeserializeValue( x._2.getBytes ))).map(x =
(x._1._1,x._1._2,x._2._1, x._2._2))
val gsup_results_geod = gsup.flatMap(x= doQueryGSUP(has_expo_criteria,
has_fence_criteria, timerange_start_expo, timerange_end_expo,
timerange_start_fence, timerange_end_fence, expo_pois, fence_pois,x))
val gsup_results_reduced =
gsup_results_geod.reduceByKey((a,b)=((a._1.toShort | b._1.toShort).toByte,
a._2+b._2))

*val gsup_results = gsup_results_reduced.filter(x=(criteria_filter.value
contains x._2._1.toInt))*



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JVM-Memory-Woes-tp19496p19510.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Many retries for Python job

2014-11-21 Thread Brett Meyer
According to the web UI I don¹t see any executors dying during Stage 2.  I
looked over the YARN logs and didn¹t see anything suspicious, but I may not
have been looking closely enough.  Stage 2 seems to complete just fine, it¹s
just when it enters Stage 3 that the results from the previous stage seem to
be missing in many cases and result in FetchFailure errors.  I should
probably also mention that I have the spark.storage.memoryFraction set to
0.2.

From:  Sandy Ryza sandy.r...@cloudera.com
Date:  Friday, November 21, 2014 at 1:41 PM
To:  Brett Meyer brett.me...@crowdstrike.com
Cc:  user@spark.apache.org user@spark.apache.org
Subject:  Re: Many retries for Python job

Hi Brett, 

Are you noticing executors dying?  Are you able to check the YARN
NodeManager logs and see whether YARN is killing them for exceeding memory
limits?

-Sandy

On Fri, Nov 21, 2014 at 9:47 AM, Brett Meyer brett.me...@crowdstrike.com
wrote:
 I¹m running a Python script with spark-submit on top of YARN on an EMR cluster
 with 30 nodes.  The script reads in approximately 3.9 TB of data from S3, and
 then does some transformations and filtering, followed by some aggregate
 counts.  During Stage 2 of the job, everything looks to complete just fine
 with no executor failures or resubmissions, but when Stage 3 starts up, many
 Stage 2 tasks have to be rerun due to FetchFailure errors.  Actually, I
 usually see at least 3-4 retries on Stage 2 before Stage 3 can successfully
 start.  The whole application eventually completes, but there is an addition
 of about 1+ hour overhead for all of the retries.
 
 I¹m trying to determine why there were FetchFailure exceptions, since anything
 computed in the job that could not fit in the available memory cache should be
 by default spilled to disk for further retrieval.  I also see some
 java.net.ConnectException: Connection refused² and java.io.IOException:
 sendMessageReliably failed without being ACK¹d errors in the logs after a
 CancelledKeyException followed by a ClosedChannelException, but I have no idea
 why the nodes in the EMR cluster would suddenly stop being able to
 communicate.
 
 If anyone has ideas as to why the data needs to be rerun several times in this
 job, please let me know as I am fairly bewildered about this behavior.





smime.p7s
Description: S/MIME cryptographic signature


RE: tableau spark sql cassandra

2014-11-21 Thread Mohammed Guller
Thanks, Jerome.

BTW, have you tried the CalliopeServer2 from tuplejump? I was able to quickly 
connect from beeline/Squirrel to my Cassandra cluster using CalliopeServer2, 
which extends Spark SQL Thrift Server. It was very straight forward.

Next step is to connect from Tableau, but I can't find Tableau's Spark 
connector. Where did you download it from?

Mohammed

-Original Message-
From: jererc [mailto:jer...@gmail.com] 
Sent: Friday, November 21, 2014 5:27 AM
To: u...@spark.incubator.apache.org
Subject: RE: tableau spark sql cassandra

Hi!

Sure, I'll post the info I grabbed once the cassandra tables values appear in 
Tableau.

Best,
Jerome



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282p19480.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLLib: LinearRegressionWithSGD performance

2014-11-21 Thread Jayant Shekhar
Hi Sameer,

You can try increasing the number of executor-cores.

-Jayant





On Fri, Nov 21, 2014 at 11:18 AM, Sameer Tilak ssti...@live.com wrote:

 Hi All,
 I have been using MLLib's linear regression and I have some question
 regarding the performance. We have a cluster of 10 nodes -- each node has
 24 cores and 148GB memory. I am running my app as follows:

 time spark-submit --class medslogistic.MedsLogistic --master yarn-client
 --executor-memory 6G --num-executors 10 /pathtomyapp/myapp.jar

 I am also going to play with number of executors (reduce it) may be that
 will give us different results.

 The input is a 800MB sparse file in LibSVNM format. Total number of
 features is 150K. It takes approximately 70 minutes for the regression to
 finish. The job imposes very little load on CPU, memory, network, and disk. 
 Total
 number of tasks is 104.  Total time gets divided fairly uniformly across
 these tasks each task. I was wondering, is it possible to reduce the
 execution time further?


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: MLLib: LinearRegressionWithSGD performance

2014-11-21 Thread Jayant Shekhar
Hi Sameer,

You can also use repartition to create a higher number of tasks.

-Jayant


On Fri, Nov 21, 2014 at 12:02 PM, Jayant Shekhar jay...@cloudera.com
wrote:

 Hi Sameer,

 You can try increasing the number of executor-cores.

 -Jayant





 On Fri, Nov 21, 2014 at 11:18 AM, Sameer Tilak ssti...@live.com wrote:

 Hi All,
 I have been using MLLib's linear regression and I have some question
 regarding the performance. We have a cluster of 10 nodes -- each node has
 24 cores and 148GB memory. I am running my app as follows:

 time spark-submit --class medslogistic.MedsLogistic --master yarn-client
 --executor-memory 6G --num-executors 10 /pathtomyapp/myapp.jar

 I am also going to play with number of executors (reduce it) may be that
 will give us different results.

 The input is a 800MB sparse file in LibSVNM format. Total number of
 features is 150K. It takes approximately 70 minutes for the regression to
 finish. The job imposes very little load on CPU, memory, network, and disk. 
 Total
 number of tasks is 104.  Total time gets divided fairly uniformly across
 these tasks each task. I was wondering, is it possible to reduce the
 execution time further?


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Spark SQL with Apache Phoenix lower and upper Bound

2014-11-21 Thread Alaa Ali
I want to run queries on Apache Phoenix which has a JDBC driver. The query
that I want to run is:

select ts,ename from random_data_date limit 10

But I'm having issues with the JdbcRDD upper and lowerBound parameters
(that I don't actually understand).

Here's what I have so far:

import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}

val url=jdbc:phoenix:zookeeper
val sql = select ts,ename from random_data_date limit ?
val myRDD = new JdbcRDD(sc, () = DriverManager.getConnection(url), sql, 5,
10, 2, r = r.getString(ts) + ,  + r.getString(ename))

But this doesn't work because the sql expression that the JdbcRDD expects
has to have two ?s to represent the lower and upper bound.

How can I run my query through the JdbcRDD?

Regards,
Alaa Ali


Re: Spark SQL with Apache Phoenix lower and upper Bound

2014-11-21 Thread Josh Mahonin
Hi Alaa Ali,

In order for Spark to split the JDBC query in parallel, it expects an upper
and lower bound for your input data, as well as a number of partitions so
that it can split the query across multiple tasks.

For example, depending on your data distribution, you could set an upper
and lower bound on your timestamp range, and spark should be able to create
new sub-queries to split up the data.

Another option is to load up the whole table using the PhoenixInputFormat
as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
functions, but it does let you load up whole tables as RDDs.

I've previously posted example code here:
http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E

There's also an example library implementation here, although I haven't had
a chance to test it yet:
https://github.com/simplymeasured/phoenix-spark

Josh

On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali contact.a...@gmail.com wrote:

 I want to run queries on Apache Phoenix which has a JDBC driver. The query
 that I want to run is:

 select ts,ename from random_data_date limit 10

 But I'm having issues with the JdbcRDD upper and lowerBound parameters
 (that I don't actually understand).

 Here's what I have so far:

 import org.apache.spark.rdd.JdbcRDD
 import java.sql.{Connection, DriverManager, ResultSet}

 val url=jdbc:phoenix:zookeeper
 val sql = select ts,ename from random_data_date limit ?
 val myRDD = new JdbcRDD(sc, () = DriverManager.getConnection(url), sql,
 5, 10, 2, r = r.getString(ts) + ,  + r.getString(ename))

 But this doesn't work because the sql expression that the JdbcRDD expects
 has to have two ?s to represent the lower and upper bound.

 How can I run my query through the JdbcRDD?

 Regards,
 Alaa Ali



Re: MongoDB Bulk Inserts

2014-11-21 Thread Benny Thompson
I tried using RDD#mapPartitions but my job completes prematurely and
without error as if nothing gets done.  What I have is fairly simple

sc
.textFile(inputFile)
.map(parser.parse)
.mapPartitions(bulkLoad)

But the Iterator[T] of mapPartitions is always empty, even though I know
map is generating records.


On Thu Nov 20 2014 at 9:25:54 PM Soumya Simanta soumya.sima...@gmail.com
wrote:

 On Thu, Nov 20, 2014 at 10:18 PM, Benny Thompson ben.d.tho...@gmail.com
 wrote:

 I'm trying to use MongoDB as a destination for an ETL I'm writing in
 Spark.  It appears I'm gaining a lot of overhead in my system databases
 (and possibly in the primary documents themselves);  I can only assume it's
 because I'm left to using PairRDD.saveAsNewAPIHadoopFile.

 - Is there a way to batch some of the data together and use Casbah
 natively so I can use bulk inserts?


 Why cannot you write Mongo in a RDD#mapPartition ?



 - Is there maybe a less hacky way to load to MongoDB (instead of
 using saveAsNewAPIHadoopFile)?


 If the latency (time by which all data should be in Mongo) is not a
 concern you can try a separate process that uses Akka/Casbah to write from
 HDFS into Mongo.






Running Spark application from Tomcat

2014-11-21 Thread Andreas Koch
I have a Spark java application that I run in local-mode. As such it runs
without any issues.

Now, I would like to run it as a webservice from Tomcat. The first issue I
had with this was that the spark-assembly jar contains javax.servlet, which
Tomcat does not allow. Therefore I removed javax.servlet from the jar file.

Now, I get an Exception like this:

java.lang.RuntimeException: class
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
org.apache.hadoop.security.GroupMappingServiceProvider
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1921)
org.apache.hadoop.security.Groups.init(Groups.java:64)

org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)

org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)

org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:36)

org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:109)
org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala)
org.apache.spark.SparkContext.init(SparkContext.scala:228)

org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)


Any ideas?


Thanks,

Andreas


Re: Spark SQL with Apache Phoenix lower and upper Bound

2014-11-21 Thread Alaa Ali
Awesome, thanks Josh, I missed that previous post of yours! But your code
snippet shows a select statement, so what I can do is just run a simple
select with a where clause if I want to, and then run my data processing on
the RDD to mimic the aggregation I want to do with SQL, right? Also,
another question, I still haven't tried this out, but I'll actually be
using this with PySpark, so I'm guessing the PhoenixPigConfiguration and
newHadoopRDD can be defined in PySpark as well?

Regards,
Alaa Ali

On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin jmaho...@interset.com wrote:

 Hi Alaa Ali,

 In order for Spark to split the JDBC query in parallel, it expects an
 upper and lower bound for your input data, as well as a number of
 partitions so that it can split the query across multiple tasks.

 For example, depending on your data distribution, you could set an upper
 and lower bound on your timestamp range, and spark should be able to create
 new sub-queries to split up the data.

 Another option is to load up the whole table using the PhoenixInputFormat
 as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
 functions, but it does let you load up whole tables as RDDs.

 I've previously posted example code here:

 http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E

 There's also an example library implementation here, although I haven't
 had a chance to test it yet:
 https://github.com/simplymeasured/phoenix-spark

 Josh

 On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali contact.a...@gmail.com wrote:

 I want to run queries on Apache Phoenix which has a JDBC driver. The
 query that I want to run is:

 select ts,ename from random_data_date limit 10

 But I'm having issues with the JdbcRDD upper and lowerBound parameters
 (that I don't actually understand).

 Here's what I have so far:

 import org.apache.spark.rdd.JdbcRDD
 import java.sql.{Connection, DriverManager, ResultSet}

 val url=jdbc:phoenix:zookeeper
 val sql = select ts,ename from random_data_date limit ?
 val myRDD = new JdbcRDD(sc, () = DriverManager.getConnection(url), sql,
 5, 10, 2, r = r.getString(ts) + ,  + r.getString(ename))

 But this doesn't work because the sql expression that the JdbcRDD expects
 has to have two ?s to represent the lower and upper bound.

 How can I run my query through the JdbcRDD?

 Regards,
 Alaa Ali





Re: Spark SQL with Apache Phoenix lower and upper Bound

2014-11-21 Thread Alex Kamil
Ali,

just create a BIGINT column with numeric values in phoenix and use sequences
http://phoenix.apache.org/sequences.html to populate it automatically

I included the setup below in case someone starts from scratch

Prerequisites:
- export JAVA_HOME, SCALA_HOME and install sbt
- install hbase in standalone mode
http://hbase.apache.org/book/quickstart.html
- add phoenix jar http://phoenix.apache.org/download.html to hbase lib
directory
- start hbase and create a table in phoenix
http://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html to verify
everything is working
- install spark in standalone mode, and verify that it works using spark
shell http://spark.apache.org/docs/latest/quick-start.html

1. create a sequence http://phoenix.apache.org/sequences.html in phoenix:
$PHOENIX_HOME/hadoop1/bin/sqlline.py localhost

  CREATE SEQUENCE IF NOT EXISTS my_schema.my_sequence;

2.add a BIGINT column called e.g. id to your table in phoenix

 CREATE TABLE test.orders ( id BIGINT not null primary key, name VARCHAR);

3. add some values
UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'foo');
UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'bar');

4. create jdbc adapter (following SimpleApp setup in
Spark-GettingStarted-StandAlone
applications
https://spark.apache.org/docs/latest/quick-start.html#Standalone_Applications
):

//SparkToJDBC.scala

import java.sql.DriverManager
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import java.util.Date;

import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD

object SparkToJDBC {

  def main(args: Array[String]) {
val sc = new SparkContext(local, phoenix)
try{
val rdd = new JdbcRDD(sc,() = {

Class.forName(org.apache.phoenix.jdbc.PhoenixDriver).newInstance()

DriverManager.getConnection(jdbc:phoenix:localhost, , )
   },
   SELECT id, name  FROM test.orders WHERE id = ? AND id
= ?,
1, 100, 3,
(r:ResultSet) = {
processResultSet(r)
}
).cache()

println(#);
println(rdd.count());
println(#);
 } catch {
  case _: Throwable = println(Could not connect to database)
 }
 sc.stop()
  }

def processResultSet(rs: ResultSet){

  val rsmd = rs.getMetaData()
  val numberOfColumns = rsmd.getColumnCount()

  var i = 1
  while (i = numberOfColumns) {
val colName = rsmd.getColumnName(i)
val tableName = rsmd.getTableName(i)
val name = rsmd.getColumnTypeName(i)
val caseSen = rsmd.isCaseSensitive(i)
val writable = rsmd.isWritable(i)
println(Information for column  + colName)
println(Column is in table  + tableName)
println(column type is  + name)
println()
i += 1
  }

  while (rs.next()) {
var i = 1
while (i = numberOfColumns) {
  val s = rs.getString(i)
  System.out.print(s +   )
  i += 1
}
println()
  }
   }

}

5. build SparkToJDBC.scala
sbt package

6. execute spark job:
note: don't forget to add phoenix jar using --jars option like this:

../spark-1.1.0/bin/spark-submit *--jars ../phoenix-3.1.0-bin/hadoop2/*
*phoenix-3.1.0-client-hadoop2.**jar *--class SparkToJDBC --master
local[4] target/scala-2.10/simple-project_2.10-1.0.jar

regards
Alex


On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin jmaho...@interset.com wrote:

 Hi Alaa Ali,

 In order for Spark to split the JDBC query in parallel, it expects an
 upper and lower bound for your input data, as well as a number of
 partitions so that it can split the query across multiple tasks.

 For example, depending on your data distribution, you could set an upper
 and lower bound on your timestamp range, and spark should be able to create
 new sub-queries to split up the data.

 Another option is to load up the whole table using the PhoenixInputFormat
 as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
 functions, but it does let you load up whole tables as RDDs.

 I've previously posted example code here:

 http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E

 There's also an example library implementation here, although I haven't
 had a chance to test it yet:
 https://github.com/simplymeasured/phoenix-spark

 Josh

 On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali contact.a...@gmail.com wrote:

 I want to run queries on Apache Phoenix which has a JDBC driver. The
 query that I want to run is:

 select ts,ename 

Persist kafka streams to text file

2014-11-21 Thread Joanne Contact
Hello I am trying to read kafka stream to a text file by running spark from
my IDE (IntelliJ IDEA) . The code is similar as a previous thread on
persisting stream to a text file.

I am new to spark or scala. I believe the spark is on local mode as the
console shows
14/11/21 14:17:11 INFO spark.SparkContext: Spark configuration:
spark.app.name=local-mode

 I got the following errors. It is related to Tachyon. But I don't know if
I have tachyon or not.

14/11/21 14:17:54 WARN storage.TachyonBlockManager: Attempt 1 to create
tachyon dir null failed
java.io.IOException: Failed to connect to master localhost/127.0.0.1:19998
after 5 attempts
at tachyon.client.TachyonFS.connect(TachyonFS.java:293)
at tachyon.client.TachyonFS.getFileId(TachyonFS.java:1011)
at tachyon.client.TachyonFS.exist(TachyonFS.java:633)
at
org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:117)
at
org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:106)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.storage.TachyonBlockManager.createTachyonDirs(TachyonBlockManager.scala:106)
at
org.apache.spark.storage.TachyonBlockManager.init(TachyonBlockManager.scala:57)
at
org.apache.spark.storage.BlockManager.tachyonStore$lzycompute(BlockManager.scala:88)
at org.apache.spark.storage.BlockManager.tachyonStore(BlockManager.scala:82)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:729)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:594)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: tachyon.org.apache.thrift.TException: Failed to connect to
master localhost/127.0.0.1:19998 after 5 attempts
at tachyon.master.MasterClient.connect(MasterClient.java:178)
at tachyon.client.TachyonFS.connect(TachyonFS.java:290)
... 28 more
Caused by: tachyon.org.apache.thrift.transport.TTransportException:
java.net.ConnectException: Connection refused
at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:185)
at
tachyon.org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
at tachyon.master.MasterClient.connect(MasterClient.java:156)
... 29 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:180)
... 31 more
14/11/21 14:17:54 ERROR storage.TachyonBlockManager: Failed 10 attempts to
create tachyon dir in
/tmp_spark_tachyon/spark-3dbec68b-f5b8-45e1-bb68-370439839d4a/driver

I looked at the code. It has the following part. Is that a problem?

.persist(StorageLevel.OFF_HEAP)

Any advice?

Thank you!

J


Persist kafka streams to text file, tachyon error?

2014-11-21 Thread Joanne Contact
use the right email list.
-- Forwarded message --
From: Joanne Contact joannenetw...@gmail.com
Date: Fri, Nov 21, 2014 at 2:32 PM
Subject: Persist kafka streams to text file
To: u...@spark.incubator.apache.org


Hello I am trying to read kafka stream to a text file by running spark from
my IDE (IntelliJ IDEA) . The code is similar as a previous thread on
persisting stream to a text file.

I am new to spark or scala. I believe the spark is on local mode as the
console shows
14/11/21 14:17:11 INFO spark.SparkContext: Spark configuration:
spark.app.name=local-mode

 I got the following errors. It is related to Tachyon. But I don't know if
I have tachyon or not.

14/11/21 14:17:54 WARN storage.TachyonBlockManager: Attempt 1 to create
tachyon dir null failed
java.io.IOException: Failed to connect to master localhost/127.0.0.1:19998
after 5 attempts
at tachyon.client.TachyonFS.connect(TachyonFS.java:293)
at tachyon.client.TachyonFS.getFileId(TachyonFS.java:1011)
at tachyon.client.TachyonFS.exist(TachyonFS.java:633)
at
org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:117)
at
org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:106)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.storage.TachyonBlockManager.createTachyonDirs(TachyonBlockManager.scala:106)
at
org.apache.spark.storage.TachyonBlockManager.init(TachyonBlockManager.scala:57)
at
org.apache.spark.storage.BlockManager.tachyonStore$lzycompute(BlockManager.scala:88)
at org.apache.spark.storage.BlockManager.tachyonStore(BlockManager.scala:82)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:729)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:594)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: tachyon.org.apache.thrift.TException: Failed to connect to
master localhost/127.0.0.1:19998 after 5 attempts
at tachyon.master.MasterClient.connect(MasterClient.java:178)
at tachyon.client.TachyonFS.connect(TachyonFS.java:290)
... 28 more
Caused by: tachyon.org.apache.thrift.transport.TTransportException:
java.net.ConnectException: Connection refused
at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:185)
at
tachyon.org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
at tachyon.master.MasterClient.connect(MasterClient.java:156)
... 29 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:180)
... 31 more
14/11/21 14:17:54 ERROR storage.TachyonBlockManager: Failed 10 attempts to
create tachyon dir in
/tmp_spark_tachyon/spark-3dbec68b-f5b8-45e1-bb68-370439839d4a/driver

I looked at the code. It has the following part. Is that a problem?

.persist(StorageLevel.OFF_HEAP)

Any advice?

Thank you!

J


Re: SparkSQL - can we add new column(s) to parquet files

2014-11-21 Thread Evan Chan
I would expect an SQL query on c would fail because c would not be known in
the schema of the older Parquet file.

What I'd be very interested in is how to add a new column as an incremental
new parquet file, and be able to somehow join the existing and new file, in
an efficient way.   IE, somehow guarantee that for every row in the old
parquet file, that the corresponding rows in the new file would be stored
in the same node, so that joins are local.

On Fri, Nov 21, 2014 at 10:03 AM, Sadhan Sood sadhan.s...@gmail.com wrote:

 We create the table definition by reading the parquet file for schema and
 store it in hive metastore. But if someone adds a new column to the schema,
 and if we rescan the schema from the new parquet files and update the table
 definition, would it still work if we run queries on the table ?

 So, old table has - Int a, Int b
 new table - Int a, Int b, String c

 but older parquet files don't have String c, so on querying the table
 would it return me null for column c  from older files and data from newer
 files or fail?



Re: Another accumulator question

2014-11-21 Thread Andrew Ash
Hi Nathan,

It sounds like what you're asking for has already been filed as
https://issues.apache.org/jira/browse/SPARK-664  Does that ticket match
what you're proposing?

Andrew

On Fri, Nov 21, 2014 at 12:29 PM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 We've done this with reduce - that definitely works.

 I've reworked the logic to use accumulators because, when it works, it's
 5-10x faster

 On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen so...@cloudera.com wrote:

 This sounds more like a use case for reduce? or fold? it sounds like
 you're kind of cobbling together the same function on accumulators,
 when reduce/fold are simpler and have the behavior you suggest.

 On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
 nkronenf...@oculusinfo.com wrote:
  I think I understand what is going on here, but I was hoping someone
 could
  confirm (or explain reality if I don't) what I'm seeing.
 
  We are collecting data using a rather sizable accumulator -
 essentially, an
  array of tens of thousands of entries.  All told, about 1.3m of data.
 
  If I understand things correctly, it looks to me like, when our job is
 done,
  a copy of this array is retrieved from each individual task, all at
 once,
  for combination on the client - which means, with 400 tasks to the job,
 each
  collection is using up half a gig of memory on the client.
 
  Is this true?  If so, does anyone know a way to get accumulators to
  accumulate as results collect, rather than all at once at the end, so we
  only have to hold a few in memory at a time, rather than all 400?
 
  Thanks,
-Nathan
 
 
  --
  Nathan Kronenfeld
  Senior Visualization Developer
  Oculus Info Inc
  2 Berkeley Street, Suite 600,
  Toronto, Ontario M5A 4J5
  Phone:  +1-416-203-3003 x 238
  Email:  nkronenf...@oculusinfo.com




 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



RE: Using TF-IDF from MLlib

2014-11-21 Thread Daniel, Ronald (ELS-SDG)
Thanks for the info Andy. A big help.

One thing - I think you can figure out which document is responsible for which 
vector without checking in more code.
Start with a PairRDD of [doc_id, doc_string] for each document and split that 
into one RDD for each column.
The values in the doc_string RDD get split and turned into a Seq and fed to 
TFIDF.
You can take the resulting RDD[Vector]s and zip them with the doc_id RDD. 
Presto!

Best regards,
Ron





Re: Using TF-IDF from MLlib

2014-11-21 Thread andy petrella
Yeah, I initially used zip but I was wondering how reliable it is. I mean,
it's the order guaranteed? What if some mode fail, and the data is pulled
out from different nodes?
And even if it can work, I found this implicit semantic quite
uncomfortable, don't you?

My0.2c

Le ven 21 nov. 2014 15:26, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com
a écrit :

 Thanks for the info Andy. A big help.

 One thing - I think you can figure out which document is responsible for
 which vector without checking in more code.
 Start with a PairRDD of [doc_id, doc_string] for each document and split
 that into one RDD for each column.
 The values in the doc_string RDD get split and turned into a Seq and fed
 to TFIDF.
 You can take the resulting RDD[Vector]s and zip them with the doc_id RDD.
 Presto!

 Best regards,
 Ron






Book: Data Analysis with SparkR

2014-11-21 Thread Emaasit
Is the a book on SparkR for the absolute  terrified beginner?
I use R for my daily analysis and I am interested in a detailed guide to
using SparkR for data analytics: like a book or online tutorials. If there's
any please direct me to the address.

Thanks,
Daniel



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Book-Data-Analysis-with-SparkR-tp19529.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to deal with BigInt in my case class for RDD = SchemaRDD convertion

2014-11-21 Thread Jianshi Huang
Ah yes. I found it too in the manual. Thanks for the help anyway!

Since BigDecimal is just a wrapper around BigInt, let's also convert to
BigInt to Decimal.

I created a ticket. https://issues.apache.org/jira/browse/SPARK-4549

Jianshi

On Fri, Nov 21, 2014 at 11:30 PM, Yin Huai huaiyin@gmail.com wrote:

 Hello Jianshi,

 The reason of that error is that we do not have a Spark SQL data type for
 Scala BigInt. You can use Decimal for your case.

 Thanks,

 Yin

 On Fri, Nov 21, 2014 at 5:11 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I got an error during rdd.registerTempTable(...) saying scala.MatchError:
 scala.BigInt

 Looks like BigInt cannot be used in SchemaRDD, is that correct?

 So what would you recommend to deal with it?

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Missing parents for stage (Spark Streaming)

2014-11-21 Thread YaoPau
When I submit a Spark Streaming job, I see these INFO logs printing
frequently:

14/11/21 18:53:17 INFO DAGScheduler: waiting: Set(Stage 216)
14/11/21 18:53:17 INFO DAGScheduler: failed: Set()
14/11/21 18:53:17 INFO DAGScheduler: Missing parents for Stage 216: List()
14/11/21 18:53:17 INFO DAGScheduler: Submitting Stage 216 (MappedRDD[1733]
at map at MappedDStream.scala:35), which is now runnable

I have a feeling this means there is some error with a Map I created as a
broadcast variable, but I'm not sure.  How can I figure out what this is
referring to?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Missing-parents-for-stage-Spark-Streaming-tp19530.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Book: Data Analysis with SparkR

2014-11-21 Thread Zongheng Yang
Hi Daniel,

Thanks for your email! We don't have a book (yet?) specifically on SparkR,
but here's a list of helpful tutorials / links you can check out (I am
listing them in roughly basic - advanced order):

- AMPCamp5 SparkR exercises
http://ampcamp.berkeley.edu/5/exercises/sparkr.html. This covers the
basics of SparkR's API, performs basic analytics, and visualizes the
results.
- SparkR examples
https://github.com/amplab-extras/SparkR-pkg/tree/master/examples. We have
K-means, logistic regression, MNIST solver, \pi estimation, word count and
other examples available.
- Running SparkR on EC2
https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2.
This entry details the steps to run a SparkR program on an EC2 cluster.

Finally, we have a talk at the AMPCamp http://ampcamp.berkeley.edu/5/
today on SparkR, whose video  slides will be available soon on the website
-- it covers the basics of the interface  what you can do with it.
Additionally, you could direct any SparkR questions to our sparkr-dev
https://groups.google.com/forum/#!forum/sparkr-dev mailing list.

Let us know if you have further questions.

Zongheng

On Fri Nov 21 2014 at 3:48:53 PM Emaasit daniel.emaa...@gmail.com wrote:

 Is the a book on SparkR for the absolute  terrified beginner?
 I use R for my daily analysis and I am interested in a detailed guide to
 using SparkR for data analytics: like a book or online tutorials. If
 there's
 any please direct me to the address.

 Thanks,
 Daniel



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Book-Data-Analysis-with-SparkR-tp19529.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: MongoDB Bulk Inserts

2014-11-21 Thread Soumya Simanta
bulkLoad has the connection to MongoDB ?

On Fri, Nov 21, 2014 at 4:34 PM, Benny Thompson ben.d.tho...@gmail.com
wrote:

 I tried using RDD#mapPartitions but my job completes prematurely and
 without error as if nothing gets done.  What I have is fairly simple

 sc
 .textFile(inputFile)
 .map(parser.parse)
 .mapPartitions(bulkLoad)

 But the Iterator[T] of mapPartitions is always empty, even though I know
 map is generating records.


 On Thu Nov 20 2014 at 9:25:54 PM Soumya Simanta soumya.sima...@gmail.com
 wrote:

 On Thu, Nov 20, 2014 at 10:18 PM, Benny Thompson ben.d.tho...@gmail.com
 wrote:

 I'm trying to use MongoDB as a destination for an ETL I'm writing in
 Spark.  It appears I'm gaining a lot of overhead in my system databases
 (and possibly in the primary documents themselves);  I can only assume it's
 because I'm left to using PairRDD.saveAsNewAPIHadoopFile.

 - Is there a way to batch some of the data together and use Casbah
 natively so I can use bulk inserts?


 Why cannot you write Mongo in a RDD#mapPartition ?



 - Is there maybe a less hacky way to load to MongoDB (instead of
 using saveAsNewAPIHadoopFile)?


 If the latency (time by which all data should be in Mongo) is not a
 concern you can try a separate process that uses Akka/Casbah to write from
 HDFS into Mongo.






allocating different memory to different executor for same application

2014-11-21 Thread tridib
Hello Experts,
I have 5 worker machines with different size of RAM. is there a way to
configure it with different executor memory?

Currently I see that all worker spins up 1 executor with same amount of
memory.

Thanks  Regards
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/allocating-different-memory-to-different-executor-for-same-application-tp19534.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Another accumulator question

2014-11-21 Thread Nathan Kronenfeld
Im not sure if it's an exact match, or just very close :-)

I don't think our problem is the workload on the driver, I think it's just
memory - so while the solution proposed there would work, it would also be
sufficient for our purposes, I believe, simply to clear each block as soon
as it's added into the canonical version, and try to do so as soon as
possible - but I could be misunderstanding some of the timing, I'm still
investigating.

Though to combine on the worker before returning, as he suggests, would
probably be even better.

On Fri, Nov 21, 2014 at 6:08 PM, Andrew Ash and...@andrewash.com wrote:

 Hi Nathan,

 It sounds like what you're asking for has already been filed as
 https://issues.apache.org/jira/browse/SPARK-664  Does that ticket match
 what you're proposing?

 Andrew

 On Fri, Nov 21, 2014 at 12:29 PM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 We've done this with reduce - that definitely works.

 I've reworked the logic to use accumulators because, when it works, it's
 5-10x faster

 On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen so...@cloudera.com wrote:

 This sounds more like a use case for reduce? or fold? it sounds like
 you're kind of cobbling together the same function on accumulators,
 when reduce/fold are simpler and have the behavior you suggest.

 On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
 nkronenf...@oculusinfo.com wrote:
  I think I understand what is going on here, but I was hoping someone
 could
  confirm (or explain reality if I don't) what I'm seeing.
 
  We are collecting data using a rather sizable accumulator -
 essentially, an
  array of tens of thousands of entries.  All told, about 1.3m of data.
 
  If I understand things correctly, it looks to me like, when our job is
 done,
  a copy of this array is retrieved from each individual task, all at
 once,
  for combination on the client - which means, with 400 tasks to the
 job, each
  collection is using up half a gig of memory on the client.
 
  Is this true?  If so, does anyone know a way to get accumulators to
  accumulate as results collect, rather than all at once at the end, so
 we
  only have to hold a few in memory at a time, rather than all 400?
 
  Thanks,
-Nathan
 
 
  --
  Nathan Kronenfeld
  Senior Visualization Developer
  Oculus Info Inc
  2 Berkeley Street, Suite 600,
  Toronto, Ontario M5A 4J5
  Phone:  +1-416-203-3003 x 238
  Email:  nkronenf...@oculusinfo.com




 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com





-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


spark-sql broken

2014-11-21 Thread tridib
After taking today's build from master branch I started getting this error
when run spark-sql:

Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.

I used following command for building:
 ./make-distribution.sh --tgz -Pyarn -Dyarn.version=2.4.0 -Phadoop-2.4
-Dhadoop.version=2.4.0 -Phive  -Phive-thriftserver -DskipTests

Is there anything I am missing?

Thanks
Tridib




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-broken-tp19536.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava

2014-11-21 Thread Judy Nash
Hi,

Thrift server is failing to start for me on latest spark 1.2 branch.

I got the error below when I start thrift server.
Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas
e/Preconditions
at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur
ation.java:314)

Here is my setup:

1)  Latest spark 1.2 branch build

2)  Used build command:

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver 
-DskipTests clean package

3)  Added hive-site.xml to \conf

4)  Version on the box: Hive 0.13, Hadoop 2.4

Is this a real bug or am I doing something wrong?

---
Full Stacktrace:
Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas
e/Preconditions
at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur
ation.java:314)
at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur
ation.java:327)
at org.apache.hadoop.conf.Configuration.clinit(Configuration.java:409)

at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU
til.scala:82)
at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:
42)
at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala
:202)
at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca
la)
at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784)
at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105)
at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)
at org.apache.spark.SparkContext.init(SparkContext.scala:230)
at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.
scala:38)
at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh
riftServer2.scala:56)
at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr
iftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.google.common.base.Precondition
s
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)


Spark streaming job failing after some time.

2014-11-21 Thread pankaj channe
I have seen similar posts on this issue but could not find solution.
Apologies if this has been discussed here before.

I am running a spark streaming job with yarn on a 5 node cluster. I am
using following command to submit my streaming job.

spark-submit --class class_name --master yarn-cluster --num-executors 1
--driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar


After running for some time, the job stops. The application log shows
following two errors:

14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve
SparkContext in spite of waiting for 10, maxNumTries = 10
Exception in thread main java.lang.NullPointerException
at
org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


and later...

Failed to list files for dir:
/data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20
at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at
org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169)


Note: I am building my jar on my local with spark dependency added in
pom.xml and running it on cluster running spark.


-Pankaj


Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava

2014-11-21 Thread Cheng Lian
Hi Judy, could you please provide the commit SHA1 of the version you're 
using? Thanks!


On 11/22/14 11:05 AM, Judy Nash wrote:


Hi,

Thrift server is failing to start for me on latest spark 1.2 branch.

I got the error below when I start thrift server.

Exception in thread main java.lang.NoClassDefFoundError: 
com/google/common/bas


e/Preconditions

at 
org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur


ation.java:314)….

Here is my setup:

1)Latest spark 1.2 branch build

2)Used build command:

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive 
-Phive-thriftserver -DskipTests clean package


3)Added hive-site.xml to \conf

4)Version on the box: Hive 0.13, Hadoop 2.4

Is this a real bug or am I doing something wrong?

---

Full Stacktrace:

Exception in thread main java.lang.NoClassDefFoundError: 
com/google/common/bas


e/Preconditions

at 
org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur


ation.java:314)

at 
org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur


ation.java:327)

at 
org.apache.hadoop.conf.Configuration.clinit(Configuration.java:409)


at 
org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU


til.scala:82)

at 
org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:


42)

at 
org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala


:202)

at 
org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca


la)

at 
org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784)


at 
org.apache.spark.storage.BlockManager.init(BlockManager.scala:105)


at 
org.apache.spark.storage.BlockManager.init(BlockManager.scala:180)


at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292)

at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)

at org.apache.spark.SparkContext.init(SparkContext.scala:230)

at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.


scala:38)

at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh


riftServer2.scala:56)

at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr


iftServer2.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.


java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces


sorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353)


at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.ClassNotFoundException: 
com.google.common.base.Precondition


s

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)