Re: Time is ugly in Spark Streaming....

2015-06-27 Thread Tathagata Das
Could you print the "time" on the driver (that is, in foreachRDD but before
RDD.foreachPartition) and see if it is behaving weird?

TD

On Fri, Jun 26, 2015 at 3:57 PM, Emrehan Tüzün 
wrote:

>
>
>
>
> On Fri, Jun 26, 2015 at 12:30 PM, Sea <261810...@qq.com> wrote:
>
>> Hi, all
>>
>> I find a problem in spark streaming, when I use the time in function 
>> foreachRDD...
>> I find the time is very interesting.
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>> StringDecoder](ssc, kafkaParams, topicsSet)
>>
>> dataStream.map(x => createGroup(x._2, 
>> dimensions)).groupByKey().foreachRDD((rdd, time) => {
>> try {
>> if (!rdd.partitions.isEmpty) {
>>   rdd.foreachPartition(partition => {
>> handlePartition(partition, timeType, time, dimensions, outputTopic, 
>> brokerList)
>>   })
>> }
>>   } catch {
>> case e: Exception => e.printStackTrace()
>>   }
>> })
>>
>>
>> val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss")
>>
>>  var date = dateFormat.format(new Date(time.milliseconds))
>>
>>
>>  Then I insert the 'date' into Kafka , but I found .
>>
>>
>> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>>
>> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>>
>> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
>> {"timestamp":"0020-06-26T16:50:36
>> ","status":"7","type":"0","waittime":"0","count":1722}
>>
>> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>>
>> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>>
>> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>>
>>
>


Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
Hi,

I'm executing a SparkStreamig code with Kafka. IçThe code was working but
today I tried to execute the code again and I got an exception, I dn't know
what's it happening. right now , there are no jobs executions on YARN.
How could it fix it?

Exception in thread "main" org.apache.spark.SparkException: Yarn
application has already ended! It might have been killed or unable to
launch application master.
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
at org.apache.spark.SparkContext.(SparkContext.scala:379)
at
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:75)
at
com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
at
com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
*15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete Spark
local dirs*
java.lang.NullPointerException
at org.apache.spark.storage.DiskBlockManager.org
$apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at
org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
Exception in thread "delete Spark local dirs" java.lang.NullPointerException
at org.apache.spark.storage.DiskBlockManager.org
$apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at
org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)


Re: spark streaming - checkpoint

2015-06-27 Thread Tathagata Das
Do you have SPARK_CLASSPATH set in both cases? Before and after checkpoint?
If yes, then you should not be using SPARK_CLASSPATH, it has been
deprecated since Spark 1.0 because of its ambiguity.
Also where do you have spark.executor.extraClassPath set? I dont see it in
the spark-submit command.

On Fri, Jun 26, 2015 at 6:05 AM, ram kumar  wrote:

> Hi,
>
> -
>
> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
> Duration(1));
> ssc.checkpoint(checkPointDir);
>
> JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
> public JavaStreamingContext create() {
> return createContext(checkPointDir, outputDirectory);
> }
>
> };
> JavaStreamingContext ssc =
> JavaStreamingContext.getOrCreate(checkPointDir, factory);
>
> 
>
> *first time, i run this. It work fine.*
>
> *but, second time. it shows following error.*
> *i deleted the checkpoint path and then it works.*
>
> ---
> [user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar
> --conf spark.driver.allowMultipleContexts=true --class com.spark.Pick
> --master yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> 2015-06-26 12:43:42,981 WARN  [main] util.NativeCodeLoader
> (NativeCodeLoader.java:(62)) - Unable to load native-hadoop library
> for your platform... using builtin-java classes where applicable
> 2015-06-26 12:43:44,246 WARN  [main] shortcircuit.DomainSocketFactory
> (DomainSocketFactory.java:(116)) - The short-circuit local reads
> feature cannot be used because libhadoop cannot be loaded.
>
> This is deprecated in Spark 1.0+.
>
> Please instead use:
>  - ./spark-submit with --driver-class-path to augment the driver classpath
>  - spark.executor.extraClassPath to augment the executor classpath
>
> Exception in thread "main" org.apache.spark.SparkException: Found both
> spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.
> at
> org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334)
> at
> org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332)
> at
> org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320)
> at org.apache.spark.SparkContext.(SparkContext.scala:178)
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:118)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
> at scala.Option.map(Option.scala:145)
> at
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561)
> at
> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566)
> at
> org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
> at
> com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> [user@h7 ~]
>
> --
>
> *can anyone help me with it*
>
>
> *thanks*
>


Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Tathagata Das
How are you trying to execute the code again? From checkpoints, or
otherwise?
Also cc'ed Hari who may have a better idea of YARN related issues.

On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz 
wrote:

> Hi,
>
> I'm executing a SparkStreamig code with Kafka. IçThe code was working but
> today I tried to execute the code again and I got an exception, I dn't know
> what's it happening. right now , there are no jobs executions on YARN.
> How could it fix it?
>
> Exception in thread "main" org.apache.spark.SparkException: Yarn
> application has already ended! It might have been killed or unable to
> launch application master.
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
> at org.apache.spark.SparkContext.(SparkContext.scala:379)
> at
> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:75)
> at
> com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
> at
> com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete Spark
> local dirs*
> java.lang.NullPointerException
> at org.apache.spark.storage.DiskBlockManager.org
> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
> at
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
> at
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
> at
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
> at
> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
> Exception in thread "delete Spark local dirs"
> java.lang.NullPointerException
> at org.apache.spark.storage.DiskBlockManager.org
> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
> at
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
> at
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
> at
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
> at
> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
>
>
>
>


Re: spark streaming job fails to restart after checkpointing due to DStream initialization errors

2015-06-27 Thread Tathagata Das
Could you also provide the code where you set up the Kafka dstream? I dont
see it in the snippet.

On Fri, Jun 26, 2015 at 2:45 PM, Ashish Nigam 
wrote:

> Here's code -
>
> def createStreamingContext(checkpointDirectory: String) :
> StreamingContext = {
>
> val conf = new SparkConf().setAppName("KafkaConsumer")
>
> conf.set("spark.eventLog.enabled", "false")
>
> logger.info("Going to init spark context")
>
> conf.getOption("spark.master") match {
>
> case Some(master) => println(master)
>
> case None => conf.setMaster("local[*]")
>
> }
>
> val sc = new SparkContext(conf)
>
> // Create a StreamingContext with a 5 second batch size
>
> val ssc = new StreamingContext(sc, Seconds(5))
>
> ssc.checkpoint(checkpointDirectory)
>
> ssc
>
> }
>
>
> And here's how it is being called -
>
>
> val ssc = StreamingContext.getOrCreate(checkpointDir, () => {
>
> createStreamingContext(checkpointDir)
>
>   })
>
>
> On Fri, Jun 26, 2015 at 2:05 PM, Cody Koeninger 
> wrote:
>
>> Make sure you're following the docs regarding setting up a streaming
>> checkpoint.
>>
>> Post your code if you can't get it figured out.
>>
>> On Fri, Jun 26, 2015 at 3:45 PM, Ashish Nigam 
>> wrote:
>>
>>> I bring up spark streaming job that uses Kafka as input source.
>>> No data to process and then shut it down. And bring it back again.
>>> This time job does not start because it complains that DStream is not
>>> initialized.
>>>
>>> 15/06/26 01:10:44 ERROR yarn.ApplicationMaster: User class threw
>>> exception: org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has
>>> not been initialized
>>>
>>> org.apache.spark.SparkException:
>>> org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has not been
>>> initialized
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>
>>> at scala.Option.orElse(Option.scala:257)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>>
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>>
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>>
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>>
>>> at
>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>>
>>> at
>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>
>>> at
>>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>>
>>> at
>>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>>
>>> at
>>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>>>
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>
>>> at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90)
>>>
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
>>>
>>> at
>>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
>>>..
>>>
>>> I am using spark 1.3.1 and spark-streaming-kafka 1.3.1 versions.
>>>
>>> Any idea how to resolve this issue?
>>>
>>> Thanks
>>> Ashish
>>>
>>>
>>
>


Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
I don't have any checkpoint on my code. Really, I don't have to save any
state. It's just a log processing of a PoC.
I have been testing the code in a VM from Cloudera and I never got that
error.. Not it's a real cluster.

The command to execute Spark
spark-submit --name "PoC Logs" --master yarn-client --class
com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
/usr/metrics/ex/metrics-spark.jar $1 $2 $3

val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String]("metadata.broker.list" -> args(0))
val topics = args(1).split("\\,")
val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

directKafkaStream.foreachRDD { rdd =>
  val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
  .
   }

I understand that I just need a checkpoint if I need to recover the task it
something goes wrong, right?


2015-06-27 9:39 GMT+02:00 Tathagata Das :

> How are you trying to execute the code again? From checkpoints, or
> otherwise?
> Also cc'ed Hari who may have a better idea of YARN related issues.
>
> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz 
> wrote:
>
>> Hi,
>>
>> I'm executing a SparkStreamig code with Kafka. IçThe code was working but
>> today I tried to execute the code again and I got an exception, I dn't know
>> what's it happening. right now , there are no jobs executions on YARN.
>> How could it fix it?
>>
>> Exception in thread "main" org.apache.spark.SparkException: Yarn
>> application has already ended! It might have been killed or unable to
>> launch application master.
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
>> at
>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
>> at org.apache.spark.SparkContext.(SparkContext.scala:379)
>> at
>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:75)
>> at
>> com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
>> at
>> com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete Spark
>> local dirs*
>> java.lang.NullPointerException
>> at org.apache.spark.storage.DiskBlockManager.org
>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
>> at
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
>> at
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>> at
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>> at
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
>> at
>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
>> Exception in thread "delete Spark local dirs"
>> java.lang.NullPointerException
>> at org.apache.spark.storage.DiskBlockManager.org
>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
>> at
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
>> at
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>> at
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>> at
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
>> at
>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
>>
>>
>>
>>
>


Re: How to recover in case user errors in streaming

2015-06-27 Thread Tathagata Das
I looked at the code and found that batch exceptions are indeed ignored.
This is something that is worth fixing, that batch exceptions should not be
silently ignored.

Also, you can catch failed batch jobs (irrespective of the number of
retries) by catch the exception in foreachRDD. Here is an example.

dstream.foreachRDD { rdd =>

   try {

   } catch {

   }
}


This will catch failures at the granularity of the job, after all the max
retries of a task has been done. But it will be hard to filter and find the
push the failed record(s) somewhere. To do that, I would do use rdd.foreach
or rdd.foreachPartition, inside which I would catch the exception and push
that record out to another Kafka topic, and continue normal processing of
other records. This would prevent the task process the partition from
failing (as you are catching the bad records).

dstream.foreachRDD {  rdd =>

rdd.foreachPartition { iterator =>

 // Create Kafka producer for bad records

iterator.foreach { record =>
 try {
 // process record
 } catch {
case ExpectedException =>
// publish bad record to error topic in Kafka using
above producer
 }
}
}
}


TD

PS: Apologies for the Scala examples, hope you get the idea :)

On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani 
wrote:

>  Also, I get TaskContext.get() null when used in foreach function below (
> I get it when I use it in map, but the whole point here is to handle
> something that is breaking in action ). Please help. :(
>
>   From: amit assudani 
> Date: Friday, June 26, 2015 at 11:41 AM
>
> To: Cody Koeninger 
> Cc: "user@spark.apache.org" , Tathagata Das <
> t...@databricks.com>
> Subject: Re: How to recover in case user errors in streaming
>
>   Hmm, not sure why, but when I run this code, it always keeps on
> consuming from Kafka and proceeds ignoring the previous failed batches,
>
>  Also, Now that I get the attempt number from TaskContext and I have
> information of max retries, I am supposed to handle it in the try/catch
> block, but does it mean I’ve to handle these kind of exceptions / errors in
> every transformation step ( map, reduce, transform, etc. ), isn’t there any
> callback where it says it has been retried max number of times and before
> being ignored you’ve a handle to do whatever you want to do with the batch
> / message in hand.
>
>  Regards,
> Amit
>
>   From: Cody Koeninger 
> Date: Friday, June 26, 2015 at 11:32 AM
> To: amit assudani 
> Cc: "user@spark.apache.org" , Tathagata Das <
> t...@databricks.com>
> Subject: Re: How to recover in case user errors in streaming
>
>   No, if you have a bad message that you are continually throwing
> exceptions on, your stream will not progress to future batches.
>
> On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani 
> wrote:
>
>>  Also, what I understand is, max failures doesn’t stop the entire
>> stream, it fails the job created for the specific batch, but the subsequent
>> batches still proceed, isn’t it right ? And question still remains, how to
>> keep track of those failed batches ?
>>
>>   From: amit assudani 
>> Date: Friday, June 26, 2015 at 11:21 AM
>> To: Cody Koeninger 
>>
>> Cc: "user@spark.apache.org" , Tathagata Das <
>> t...@databricks.com>
>> Subject: Re: How to recover in case user errors in streaming
>>
>>   Thanks for quick response,
>>
>>  My question here is how do I know that the max retries are done (
>> because in my code I never know whether it is failure of first try or the
>> last try ) and I need to handle this message, is there any callback ?
>>
>>  Also, I know the limitation of checkpoint in upgrading the code, but my
>> main focus here to mitigate the connectivity issues to persistent store
>> which gets resolved in a while, but how do I know which all messages failed
>> and need rework ?
>>
>>  Regards,
>> Amit
>>
>>   From: Cody Koeninger 
>> Date: Friday, June 26, 2015 at 11:16 AM
>> To: amit assudani 
>> Cc: "user@spark.apache.org" , Tathagata Das <
>> t...@databricks.com>
>> Subject: Re: How to recover in case user errors in streaming
>>
>>   If you're consistently throwing exceptions and thus failing tasks,
>> once you reach max failures the whole stream will stop.
>>
>>  It's up to you to either catch those exceptions, or restart your stream
>> appropriately once it stops.
>>
>>  Keep in mind that if you're relying on checkpoints, and fixing the
>> error requires changing your code, you may not be able to recover the
>> checkpoint.
>>
>> On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani 
>> wrote:
>>
>>>   *Problem: *how do we recover from user errors (connectivity issues /
>>> storage service down / etc.)?
>>>
>>> *Environment:* Spark streaming using Kafka Direct Streams
>>>
>>> *Code Snippet: *
>>>
>>>
>>>
>>> HashSet topicsSet = *new* HashSet(Arrays.*asList*(
>>> "kafkaTopic1"));
>>>
>>> HashMap kafkaParams = *new* HashMap();
>>>
>>> kafkaParams.put("metadata.broke

Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Tathagata Das
1. you need checkpointing mostly for recovering from driver failures, and
in some cases also for some stateful operations.

2. Could you try not using the SPARK_CLASSPATH environment variable.

TD

On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz 
wrote:

> I don't have any checkpoint on my code. Really, I don't have to save any
> state. It's just a log processing of a PoC.
> I have been testing the code in a VM from Cloudera and I never got that
> error.. Not it's a real cluster.
>
> The command to execute Spark
> spark-submit --name "PoC Logs" --master yarn-client --class
> com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
> /usr/metrics/ex/metrics-spark.jar $1 $2 $3
>
> val sparkConf = new SparkConf()
> val ssc = new StreamingContext(sparkConf, Seconds(5))
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> args(0))
> val topics = args(1).split("\\,")
> val directKafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)
>
> directKafkaStream.foreachRDD { rdd =>
>   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
>   .
>}
>
> I understand that I just need a checkpoint if I need to recover the task
> it something goes wrong, right?
>
>
> 2015-06-27 9:39 GMT+02:00 Tathagata Das :
>
>> How are you trying to execute the code again? From checkpoints, or
>> otherwise?
>> Also cc'ed Hari who may have a better idea of YARN related issues.
>>
>> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm executing a SparkStreamig code with Kafka. IçThe code was working
>>> but today I tried to execute the code again and I got an exception, I dn't
>>> know what's it happening. right now , there are no jobs executions on YARN.
>>> How could it fix it?
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Yarn
>>> application has already ended! It might have been killed or unable to
>>> launch application master.
>>> at
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
>>> at
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
>>> at
>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
>>> at org.apache.spark.SparkContext.(SparkContext.scala:379)
>>> at
>>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
>>> at
>>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:75)
>>> at
>>> com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
>>> at
>>> com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete
>>> Spark local dirs*
>>> java.lang.NullPointerException
>>> at org.apache.spark.storage.DiskBlockManager.org
>>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
>>> at
>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
>>> at
>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>>> at
>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:139)
>>> at
>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
>>> at
>>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:139)
>>> Exception in thread "delete Spark local dirs"
>>> java.lang.NullPointerException
>>> at org.apache.spark.storage.DiskBlockManager.org
>>> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
>>> at
>>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:141)
>>> at
>>> org.apache.spark.storage.DiskBlockMana

Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
Well SPARK_CLASSPATH it's just a random name, the complete script is this:

export HADOOP_CONF_DIR=/etc/hadoop/conf
SPARK_CLASSPATH="file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/"
for lib in `ls /usr/metrics/lib/*.jar`
do
if [ -z "$SPARK_CLASSPATH" ]; then
SPARK_CLASSPATH=$lib
else
SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
fi
done
spark-submit --name "Metrics"

I need to add all the jars as you know,, maybe it was a bad name
SPARK_CLASSPATH

The code doesn't have any stateful operation, yo I guess that it¡s okay
doesn't have checkpoint. I have executed hundres of times thiscode in VM
from Cloudera and never got this error.

2015-06-27 11:21 GMT+02:00 Tathagata Das :

> 1. you need checkpointing mostly for recovering from driver failures, and
> in some cases also for some stateful operations.
>
> 2. Could you try not using the SPARK_CLASSPATH environment variable.
>
> TD
>
> On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz 
> wrote:
>
>> I don't have any checkpoint on my code. Really, I don't have to save any
>> state. It's just a log processing of a PoC.
>> I have been testing the code in a VM from Cloudera and I never got that
>> error.. Not it's a real cluster.
>>
>> The command to execute Spark
>> spark-submit --name "PoC Logs" --master yarn-client --class
>> com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
>> /usr/metrics/ex/metrics-spark.jar $1 $2 $3
>>
>> val sparkConf = new SparkConf()
>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> args(0))
>> val topics = args(1).split("\\,")
>> val directKafkaStream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)
>>
>> directKafkaStream.foreachRDD { rdd =>
>>   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
>>   .
>>}
>>
>> I understand that I just need a checkpoint if I need to recover the task
>> it something goes wrong, right?
>>
>>
>> 2015-06-27 9:39 GMT+02:00 Tathagata Das :
>>
>>> How are you trying to execute the code again? From checkpoints, or
>>> otherwise?
>>> Also cc'ed Hari who may have a better idea of YARN related issues.
>>>
>>> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz 
>>> wrote:
>>>
 Hi,

 I'm executing a SparkStreamig code with Kafka. IçThe code was working
 but today I tried to execute the code again and I got an exception, I dn't
 know what's it happening. right now , there are no jobs executions on YARN.
 How could it fix it?

 Exception in thread "main" org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.(SparkContext.scala:379)
 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
 at
 org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:75)
 at
 com.produban.metrics.MetricsTransfInternationalSpark$.main(MetricsTransfInternationalSpark.scala:66)
 at
 com.produban.metrics.MetricsTransfInternationalSpark.main(MetricsTransfInternationalSpark.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 *15/06/27 09:27:09 ERROR Utils: Uncaught exception in thread delete
 Spark local dirs*
 java.lang.NullPointerException
 at org.apache.spark.storage.DiskBlockManager.org
 $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:161)
 at
 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run

Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
I'm checking the logs in YARN and I found this error as well

Application application_1434976209271_15614 failed 2 times due to AM
Container for appattempt_1434976209271_15614_02 exited with exitCode:
255


Diagnostics: Exception from container-launch.
Container id: container_1434976209271_15614_02_01
Exit code: 255
Stack trace: ExitCodeException exitCode=255:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at
org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Shell output: Requested user hdfs is not whitelisted and has id 496,which
is below the minimum allowed 1000
Container exited with a non-zero exit code 255
Failing this attempt. Failing the application.

2015-06-27 11:25 GMT+02:00 Guillermo Ortiz :

> Well SPARK_CLASSPATH it's just a random name, the complete script is this:
>
> export HADOOP_CONF_DIR=/etc/hadoop/conf
>
> SPARK_CLASSPATH="file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/"
> for lib in `ls /usr/metrics/lib/*.jar`
> do
> if [ -z "$SPARK_CLASSPATH" ]; then
> SPARK_CLASSPATH=$lib
> else
> SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
> fi
> done
> spark-submit --name "Metrics"
>
> I need to add all the jars as you know,, maybe it was a bad name
> SPARK_CLASSPATH
>
> The code doesn't have any stateful operation, yo I guess that it¡s okay
> doesn't have checkpoint. I have executed hundres of times thiscode in VM
> from Cloudera and never got this error.
>
> 2015-06-27 11:21 GMT+02:00 Tathagata Das :
>
>> 1. you need checkpointing mostly for recovering from driver failures, and
>> in some cases also for some stateful operations.
>>
>> 2. Could you try not using the SPARK_CLASSPATH environment variable.
>>
>> TD
>>
>> On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz 
>> wrote:
>>
>>> I don't have any checkpoint on my code. Really, I don't have to save any
>>> state. It's just a log processing of a PoC.
>>> I have been testing the code in a VM from Cloudera and I never got that
>>> error.. Not it's a real cluster.
>>>
>>> The command to execute Spark
>>> spark-submit --name "PoC Logs" --master yarn-client --class
>>> com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
>>> /usr/metrics/ex/metrics-spark.jar $1 $2 $3
>>>
>>> val sparkConf = new SparkConf()
>>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>>> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>>> args(0))
>>> val topics = args(1).split("\\,")
>>> val directKafkaStream = KafkaUtils.createDirectStream[String,
>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)
>>>
>>> directKafkaStream.foreachRDD { rdd =>
>>>   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
>>>   .
>>>}
>>>
>>> I understand that I just need a checkpoint if I need to recover the task
>>> it something goes wrong, right?
>>>
>>>
>>> 2015-06-27 9:39 GMT+02:00 Tathagata Das :
>>>
 How are you trying to execute the code again? From checkpoints, or
 otherwise?
 Also cc'ed Hari who may have a better idea of YARN related issues.

 On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz >>> > wrote:

> Hi,
>
> I'm executing a SparkStreamig code with Kafka. IçThe code was working
> but today I tried to execute the code again and I got an exception, I dn't
> know what's it happening. right now , there are no jobs executions on 
> YARN.
> How could it fix it?
>
> Exception in thread "main" org.apache.spark.SparkException: Yarn
> application has already ended! It might have been killed or unable to
> launch application master.
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
> at org.apache.spark.SparkContext.(SparkContext.scala:379)
> at
> org.apache.spark.streaming.StreamingContext$.createNewSpar

Re: spark streaming with kafka reset offset

2015-06-27 Thread Tathagata Das
In the receiver based approach, If the receiver crashes for any reason
(receiver crashed or executor crashed) the receiver should get restarted on
another executor and should start reading data from the offset present in
the zookeeper. There is some chance of data loss which can alleviated using
Write Ahead Logs (see streaming programming guide for more details, or see
my talk [Slides PDF

, Video

] from last Spark Summit 2015). But that approach can give duplicate
records. The direct approach gives exactly-once guarantees, so you should
try it out.

TD

On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger  wrote:

> Read the spark streaming guide ad the kafka integration guide for a better
> understanding of how the receiver based stream works.
>
> Capacity planning is specific to your environment and what the job is
> actually doing, youll need to determine it empirically.
>
>
> On Friday, June 26, 2015, Shushant Arora 
> wrote:
>
>> In 1.2 how to handle offset management after stream application starts in
>> each job . I should commit offset after job completion manually?
>>
>> And what is recommended no of consumer threads. Say I have 300 partitions
>> in kafka cluster . Load is ~ 1 million events per second.Each event is of
>> ~500bytes. Having 5 receivers with 60 partitions each receiver is
>> sufficient for spark streaming to consume ?
>>
>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger 
>> wrote:
>>
>>> The receiver-based kafka createStream in spark 1.2 uses zookeeper to
>>> store offsets.  If you want finer-grained control over offsets, you can
>>> update the values in zookeeper yourself before starting the job.
>>>
>>> createDirectStream in spark 1.3 is still marked as experimental, and
>>> subject to change.  That being said, it works better for me in production
>>> than the receiver based api.
>>>
>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 I am using spark streaming 1.2.

 If processing executors get crashed will receiver rest the offset back
 to last processed offset?

 If receiver itself got crashed is there a way to reset the offset
 without restarting streaming application other than smallest or largest.


 Is spark streaming 1.3  which uses low level consumer api, stabe? And
 which is recommended for handling data  loss 1.2 or 1.3 .







>>>
>>


Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Tathagata Das
Well, though randomly chosen, SPARK_CLASSPATH is a recognized env variable
that is picked up by spark-submit. That is what was used pre-Spark-1.0, but
got deprecated after that. Mind renamign that variable and trying it out
again? At least it will reduce one possible source of problem.

TD

On Sat, Jun 27, 2015 at 2:32 AM, Guillermo Ortiz 
wrote:

> I'm checking the logs in YARN and I found this error as well
>
> Application application_1434976209271_15614 failed 2 times due to AM
> Container for appattempt_1434976209271_15614_02 exited with exitCode:
> 255
>
>
> Diagnostics: Exception from container-launch.
> Container id: container_1434976209271_15614_02_01
> Exit code: 255
> Stack trace: ExitCodeException exitCode=255:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Shell output: Requested user hdfs is not whitelisted and has id 496,which
> is below the minimum allowed 1000
> Container exited with a non-zero exit code 255
> Failing this attempt. Failing the application.
>
> 2015-06-27 11:25 GMT+02:00 Guillermo Ortiz :
>
>> Well SPARK_CLASSPATH it's just a random name, the complete script is this:
>>
>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>
>> SPARK_CLASSPATH="file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/"
>> for lib in `ls /usr/metrics/lib/*.jar`
>> do
>> if [ -z "$SPARK_CLASSPATH" ]; then
>> SPARK_CLASSPATH=$lib
>> else
>> SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
>> fi
>> done
>> spark-submit --name "Metrics"
>>
>> I need to add all the jars as you know,, maybe it was a bad name
>> SPARK_CLASSPATH
>>
>> The code doesn't have any stateful operation, yo I guess that it¡s okay
>> doesn't have checkpoint. I have executed hundres of times thiscode in VM
>> from Cloudera and never got this error.
>>
>> 2015-06-27 11:21 GMT+02:00 Tathagata Das :
>>
>>> 1. you need checkpointing mostly for recovering from driver failures,
>>> and in some cases also for some stateful operations.
>>>
>>> 2. Could you try not using the SPARK_CLASSPATH environment variable.
>>>
>>> TD
>>>
>>> On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz 
>>> wrote:
>>>
 I don't have any checkpoint on my code. Really, I don't have to save
 any state. It's just a log processing of a PoC.
 I have been testing the code in a VM from Cloudera and I never got that
 error.. Not it's a real cluster.

 The command to execute Spark
 spark-submit --name "PoC Logs" --master yarn-client --class
 com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
 /usr/metrics/ex/metrics-spark.jar $1 $2 $3

 val sparkConf = new SparkConf()
 val ssc = new StreamingContext(sparkConf, Seconds(5))
 val kafkaParams = Map[String, String]("metadata.broker.list" ->
 args(0))
 val topics = args(1).split("\\,")
 val directKafkaStream = KafkaUtils.createDirectStream[String,
 String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

 directKafkaStream.foreachRDD { rdd =>
   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
   .
}

 I understand that I just need a checkpoint if I need to recover the
 task it something goes wrong, right?


 2015-06-27 9:39 GMT+02:00 Tathagata Das :

> How are you trying to execute the code again? From checkpoints, or
> otherwise?
> Also cc'ed Hari who may have a better idea of YARN related issues.
>
> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz <
> konstt2...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm executing a SparkStreamig code with Kafka. IçThe code was working
>> but today I tried to execute the code again and I got an exception, I 
>> dn't
>> know what's it happening. right now , there are no jobs executions on 
>> YARN.
>> How could it fix it?
>>
>> Exception in thread "main" org.apache.spark.SparkException: Yarn
>> application has already ended! It might have been killed or unable to
>> launch application master.

Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Guillermo Ortiz
I changed the variable name and I got the same error.

2015-06-27 11:36 GMT+02:00 Tathagata Das :

> Well, though randomly chosen, SPARK_CLASSPATH is a recognized env variable
> that is picked up by spark-submit. That is what was used pre-Spark-1.0, but
> got deprecated after that. Mind renamign that variable and trying it out
> again? At least it will reduce one possible source of problem.
>
> TD
>
> On Sat, Jun 27, 2015 at 2:32 AM, Guillermo Ortiz 
> wrote:
>
>> I'm checking the logs in YARN and I found this error as well
>>
>> Application application_1434976209271_15614 failed 2 times due to AM
>> Container for appattempt_1434976209271_15614_02 exited with exitCode:
>> 255
>>
>>
>> Diagnostics: Exception from container-launch.
>> Container id: container_1434976209271_15614_02_01
>> Exit code: 255
>> Stack trace: ExitCodeException exitCode=255:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
>> at org.apache.hadoop.util.Shell.run(Shell.java:455)
>> at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Shell output: Requested user hdfs is not whitelisted and has id 496,which
>> is below the minimum allowed 1000
>> Container exited with a non-zero exit code 255
>> Failing this attempt. Failing the application.
>>
>> 2015-06-27 11:25 GMT+02:00 Guillermo Ortiz :
>>
>>> Well SPARK_CLASSPATH it's just a random name, the complete script is
>>> this:
>>>
>>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>
>>> SPARK_CLASSPATH="file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/"
>>> for lib in `ls /usr/metrics/lib/*.jar`
>>> do
>>> if [ -z "$SPARK_CLASSPATH" ]; then
>>> SPARK_CLASSPATH=$lib
>>> else
>>> SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
>>> fi
>>> done
>>> spark-submit --name "Metrics"
>>>
>>> I need to add all the jars as you know,, maybe it was a bad name
>>> SPARK_CLASSPATH
>>>
>>> The code doesn't have any stateful operation, yo I guess that it¡s okay
>>> doesn't have checkpoint. I have executed hundres of times thiscode in VM
>>> from Cloudera and never got this error.
>>>
>>> 2015-06-27 11:21 GMT+02:00 Tathagata Das :
>>>
 1. you need checkpointing mostly for recovering from driver failures,
 and in some cases also for some stateful operations.

 2. Could you try not using the SPARK_CLASSPATH environment variable.

 TD

 On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz 
 wrote:

> I don't have any checkpoint on my code. Really, I don't have to save
> any state. It's just a log processing of a PoC.
> I have been testing the code in a VM from Cloudera and I never got
> that error.. Not it's a real cluster.
>
> The command to execute Spark
> spark-submit --name "PoC Logs" --master yarn-client --class
> com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
> /usr/metrics/ex/metrics-spark.jar $1 $2 $3
>
> val sparkConf = new SparkConf()
> val ssc = new StreamingContext(sparkConf, Seconds(5))
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> args(0))
> val topics = args(1).split("\\,")
> val directKafkaStream = KafkaUtils.createDirectStream[String,
> String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)
>
> directKafkaStream.foreachRDD { rdd =>
>   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
>   .
>}
>
> I understand that I just need a checkpoint if I need to recover the
> task it something goes wrong, right?
>
>
> 2015-06-27 9:39 GMT+02:00 Tathagata Das :
>
>> How are you trying to execute the code again? From checkpoints, or
>> otherwise?
>> Also cc'ed Hari who may have a better idea of YARN related issues.
>>
>> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz <
>> konstt2...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm executing a SparkStreamig code with Kafka. IçThe code was
>>> working but today I tried to execute the code again and I got an 
>>> exception,
>>> I dn't know what's it happening. right now , there are no jobs 
>>> execut

JavaRDD and saveAsNewAPIHadoopFile()

2015-06-27 Thread Bahubali Jain
Hi,
Why doesn't JavaRDD has saveAsNewAPIHadoopFile() associated with it.


Thanks,
Baahu

-- 
Twitter:http://twitter.com/Baahu


R "on spark"

2015-06-27 Thread Evo Eftimov
I had a look at the new R "on Spark" API / Feature in Spark 1.4.0

For those "skilled in the art" (of R and distributed computing) it will be
immediately clear that "ON" is a marketing ploy and what it actually is is
"TO" ie Spark 1.4.0 offers INTERFACE from R TO DATA stored in Spark in
distributed fashion and some distributed queries which can be initiated FROM
R and run on that data within Spark - these are essentially certain types of
SQL style queries 

In order to deserve the "ON" label, RSpark has to be able to run ON Spark
most of the Statistical Analysis and Machine Learning Algos as found in the
R engine. This is absolutely not the case at the moment.

As an example of what type of Solution/Architecture I am referring to you
can review Revolution Analytics (recently acquired by Microsoft) and some
other open source frameworks for running R ON distributed clusters 

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/R-on-spark-tp23512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Ted Yu
Guillermo :

bq. Shell output: Requested user hdfs is not whitelisted and has id
496,which is below the minimum allowed 1000

Are you using a secure cluster ?

Can user hdfs be re-created with uuid > 1000 ?

Cheers

On Sat, Jun 27, 2015 at 2:32 AM, Guillermo Ortiz 
wrote:

> I'm checking the logs in YARN and I found this error as well
>
> Application application_1434976209271_15614 failed 2 times due to AM
> Container for appattempt_1434976209271_15614_02 exited with exitCode:
> 255
>
>
> Diagnostics: Exception from container-launch.
> Container id: container_1434976209271_15614_02_01
> Exit code: 255
> Stack trace: ExitCodeException exitCode=255:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Shell output: Requested user hdfs is not whitelisted and has id 496,which
> is below the minimum allowed 1000
> Container exited with a non-zero exit code 255
> Failing this attempt. Failing the application.
>
> 2015-06-27 11:25 GMT+02:00 Guillermo Ortiz :
>
>> Well SPARK_CLASSPATH it's just a random name, the complete script is this:
>>
>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>
>> SPARK_CLASSPATH="file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/"
>> for lib in `ls /usr/metrics/lib/*.jar`
>> do
>> if [ -z "$SPARK_CLASSPATH" ]; then
>> SPARK_CLASSPATH=$lib
>> else
>> SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
>> fi
>> done
>> spark-submit --name "Metrics"
>>
>> I need to add all the jars as you know,, maybe it was a bad name
>> SPARK_CLASSPATH
>>
>> The code doesn't have any stateful operation, yo I guess that it¡s okay
>> doesn't have checkpoint. I have executed hundres of times thiscode in VM
>> from Cloudera and never got this error.
>>
>> 2015-06-27 11:21 GMT+02:00 Tathagata Das :
>>
>>> 1. you need checkpointing mostly for recovering from driver failures,
>>> and in some cases also for some stateful operations.
>>>
>>> 2. Could you try not using the SPARK_CLASSPATH environment variable.
>>>
>>> TD
>>>
>>> On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz 
>>> wrote:
>>>
 I don't have any checkpoint on my code. Really, I don't have to save
 any state. It's just a log processing of a PoC.
 I have been testing the code in a VM from Cloudera and I never got that
 error.. Not it's a real cluster.

 The command to execute Spark
 spark-submit --name "PoC Logs" --master yarn-client --class
 com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g
 /usr/metrics/ex/metrics-spark.jar $1 $2 $3

 val sparkConf = new SparkConf()
 val ssc = new StreamingContext(sparkConf, Seconds(5))
 val kafkaParams = Map[String, String]("metadata.broker.list" ->
 args(0))
 val topics = args(1).split("\\,")
 val directKafkaStream = KafkaUtils.createDirectStream[String,
 String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

 directKafkaStream.foreachRDD { rdd =>
   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
   .
}

 I understand that I just need a checkpoint if I need to recover the
 task it something goes wrong, right?


 2015-06-27 9:39 GMT+02:00 Tathagata Das :

> How are you trying to execute the code again? From checkpoints, or
> otherwise?
> Also cc'ed Hari who may have a better idea of YARN related issues.
>
> On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz <
> konstt2...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm executing a SparkStreamig code with Kafka. IçThe code was working
>> but today I tried to execute the code again and I got an exception, I 
>> dn't
>> know what's it happening. right now , there are no jobs executions on 
>> YARN.
>> How could it fix it?
>>
>> Exception in thread "main" org.apache.spark.SparkException: Yarn
>> application has already ended! It might have been killed or unable to
>> launch application master.
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedu

Re: JavaRDD and saveAsNewAPIHadoopFile()

2015-06-27 Thread Ted Yu
Please take a look at JavaPairRDD.scala

Cheers

On Sat, Jun 27, 2015 at 3:42 AM, Bahubali Jain  wrote:

> Hi,
> Why doesn't JavaRDD has saveAsNewAPIHadoopFile() associated with it.
>
>
> Thanks,
> Baahu
>
> --
> Twitter:http://twitter.com/Baahu
>
>


Re: spark streaming with kafka reset offset

2015-06-27 Thread Dibyendu Bhattacharya
Hi,

There is another option to try for Receiver Based Low Level Kafka Consumer
which is part of Spark-Packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This can
be used with WAL as well for end to end zero data loss.

This is also Reliable Receiver and Commit offset to ZK.  Given the number
of Kafka Partitions you have ( > 100) , using High Level Kafka API for
Receiver based approach may leads to issues related Consumer Re-balancing
 which is a major issue of Kafka High Level API.

Regards,
Dibyendu



On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das  wrote:

> In the receiver based approach, If the receiver crashes for any reason
> (receiver crashed or executor crashed) the receiver should get restarted on
> another executor and should start reading data from the offset present in
> the zookeeper. There is some chance of data loss which can alleviated using
> Write Ahead Logs (see streaming programming guide for more details, or see
> my talk [Slides PDF
> 
> , Video
> 
> ] from last Spark Summit 2015). But that approach can give duplicate
> records. The direct approach gives exactly-once guarantees, so you should
> try it out.
>
> TD
>
> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger 
> wrote:
>
>> Read the spark streaming guide ad the kafka integration guide for a
>> better understanding of how the receiver based stream works.
>>
>> Capacity planning is specific to your environment and what the job is
>> actually doing, youll need to determine it empirically.
>>
>>
>> On Friday, June 26, 2015, Shushant Arora 
>> wrote:
>>
>>> In 1.2 how to handle offset management after stream application starts
>>> in each job . I should commit offset after job completion manually?
>>>
>>> And what is recommended no of consumer threads. Say I have 300
>>> partitions in kafka cluster . Load is ~ 1 million events per second.Each
>>> event is of ~500bytes. Having 5 receivers with 60 partitions each receiver
>>> is sufficient for spark streaming to consume ?
>>>
>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger 
>>> wrote:
>>>
 The receiver-based kafka createStream in spark 1.2 uses zookeeper to
 store offsets.  If you want finer-grained control over offsets, you can
 update the values in zookeeper yourself before starting the job.

 createDirectStream in spark 1.3 is still marked as experimental, and
 subject to change.  That being said, it works better for me in production
 than the receiver based api.

 On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> I am using spark streaming 1.2.
>
> If processing executors get crashed will receiver rest the offset back
> to last processed offset?
>
> If receiver itself got crashed is there a way to reset the offset
> without restarting streaming application other than smallest or largest.
>
>
> Is spark streaming 1.3  which uses low level consumer api, stabe? And
> which is recommended for handling data  loss 1.2 or 1.3 .
>
>
>
>
>
>
>

>>>
>


回复: Uncaught exception in thread delete Spark local dirs

2015-06-27 Thread Sea
SPARK_CLASSPATH is nice, spark.jars needs to list all the jars one by one when 
submitting to yarn because spark.driver.classpath and spark.executor.classpath 
are not available in yarn mode. Can someone remove the warnning from the code 
or upload the jar in spark.driver.classpath and spark.executor.classpath? ?




-- 原始邮件 --
发件人: "Tathagata Das";;
发送时间: 2015年6月27日(星期六) 下午5:36
收件人: "Guillermo Ortiz"; 
抄送: "Hari Shreedharan"; 
"user"; 
主题: Re: Uncaught exception in thread delete Spark local dirs



Well, though randomly chosen, SPARK_CLASSPATH is a recognized env variable that 
is picked up by spark-submit. That is what was used pre-Spark-1.0, but got 
deprecated after that. Mind renamign that variable and trying it out again? At 
least it will reduce one possible source of problem.

TD


On Sat, Jun 27, 2015 at 2:32 AM, Guillermo Ortiz  wrote:
I'm checking the logs in YARN and I found this error as well
Application application_1434976209271_15614 failed 2 times due to AM Container 
for appattempt_1434976209271_15614_02 exited with exitCode: 255




Diagnostics: Exception from container-launch.
Container id: container_1434976209271_15614_02_01
Exit code: 255
Stack trace: ExitCodeException exitCode=255:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Shell output: Requested user hdfs is not whitelisted and has id 496,which is 
below the minimum allowed 1000


Container exited with a non-zero exit code 255
Failing this attempt. Failing the application.




2015-06-27 11:25 GMT+02:00 Guillermo Ortiz :
Well SPARK_CLASSPATH it's just a random name, the complete script is this:


export HADOOP_CONF_DIR=/etc/hadoop/conf
SPARK_CLASSPATH="file:/usr/metrics/conf/elasticSearch.properties,file:/usr/metrics/conf/redis.properties,/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf/"
for lib in `ls /usr/metrics/lib/*.jar`
do
if [ -z "$SPARK_CLASSPATH" ]; then
SPARK_CLASSPATH=$lib
else
SPARK_CLASSPATH=$SPARK_CLASSPATH,$lib
fi
done 
spark-submit --name "Metrics"


I need to add all the jars as you know,, maybe it was a bad name SPARK_CLASSPATH


The code doesn't have any stateful operation, yo I guess that it¡s okay doesn't 
have checkpoint. I have executed hundres of times thiscode in VM from Cloudera 
and never got this error.


2015-06-27 11:21 GMT+02:00 Tathagata Das :
1. you need checkpointing mostly for recovering from driver failures, and in 
some cases also for some stateful operations.

2. Could you try not using the SPARK_CLASSPATH environment variable. 


TD


On Sat, Jun 27, 2015 at 1:00 AM, Guillermo Ortiz  wrote:
I don't have any checkpoint on my code. Really, I don't have to save any state. 
It's just a log processing of a PoC.I have been testing the code in a VM from 
Cloudera and I never got that error.. Not it's a real cluster.


The command to execute Spark 
spark-submit --name "PoC Logs" --master yarn-client --class 
com.metrics.MetricsSpark --jars $SPARK_CLASSPATH --executor-memory 1g 
/usr/metrics/ex/metrics-spark.jar $1 $2 $3


val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(5))  
val kafkaParams = Map[String, String]("metadata.broker.list" -> args(0))
val topics = args(1).split("\\,")
val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)


directKafkaStream.foreachRDD { rdd =>
  val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val documents = rdd.mapPartitionsWithIndex { (i, kafkaEvent) =>
  .
   }


I understand that I just need a checkpoint if I need to recover the task it 
something goes wrong, right?





2015-06-27 9:39 GMT+02:00 Tathagata Das :
How are you trying to execute the code again? From checkpoints, or 
otherwise?Also cc'ed Hari who may have a better idea of YARN related issues.


On Sat, Jun 27, 2015 at 12:35 AM, Guillermo Ortiz  wrote:
Hi,

I'm executing a SparkStreamig code with Kafka. IçThe code was working but today 
I tried to execute the code again and I got an exception, I dn't know what's it 
happening. right now , there are no jobs executions on YARN.
How could it f

?????? Time is ugly in Spark Streaming....

2015-06-27 Thread Sea
Yes , things go well now.  It is a problem of SimpleDateFormat. Thank you all.




--  --
??: "Dumas Hwang";;
: 2015??6??27??(??) 8:16
??: "Tathagata Das"; 
: "Emrehan T??z??n"; "Sea"<261810...@qq.com>; 
"dev"; "user"; 
: Re: Time is ugly in Spark Streaming



Java's SimpleDateFormat is not thread safe.  You can consider using 
DateTimeFormatter if you are using Java 8 or Joda-time

On Sat, Jun 27, 2015 at 3:32 AM, Tathagata Das  wrote:
Could you print the "time" on the driver (that is, in foreachRDD but before 
RDD.foreachPartition) and see if it is behaving weird?

TD


On Fri, Jun 26, 2015 at 3:57 PM, Emrehan T??z??n  
wrote:
 





On Fri, Jun 26, 2015 at 12:30 PM, Sea <261810...@qq.com> wrote:

 Hi, all
 

 I find a problem in spark streaming, when I use the time in function 
foreachRDD... I find the time is very interesting. 
 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)
 dataStream.map(x => createGroup(x._2, 
dimensions)).groupByKey().foreachRDD((rdd, time) => {
try {
if (!rdd.partitions.isEmpty) {
  rdd.foreachPartition(partition => {
handlePartition(partition, timeType, time, dimensions, outputTopic, brokerList)
  })
}
  } catch {
case e: Exception => e.printStackTrace()
  }
})
 

 val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss")
  var date = dateFormat.format(new Date(time.milliseconds)) 
 
 Then I insert the 'date' into Kafka , but I found .
  

 
{"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
 
{"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
 
{"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
 
{"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
 
{"timestamp":"0020-06-26T16:50:36","status":"7","type":"0","waittime":"0","count":1722}
 
{"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
 
{"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
 
{"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
 
{"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}

How to timeout a task?

2015-06-27 Thread wasauce
Hello!

We use pyspark to run a set of data extractors (think regex). The extractors
(regexes) generally run quite quickly and find a few matches which are
returned and stored into a database. 

My question is -- is it possible to make the function that runs the
extractors have a timeout? I.E. if for a given file the extractor runs for
more than X seconds it terminates and returns a default value?

Here is a code snippet of what we are doing with some comments as to which
function I am looking to timeout.

code: https://gist.github.com/wasauce/42a956a1371a2b564918 

Thank you

- Bill



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-timeout-a-task-tp23513.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dataframe left joins are not working as expected in pyspark

2015-06-27 Thread Nicholas Chammas
Yeah, you shouldn't have to rename the columns before joining them.

Do you see the same behavior on 1.3 vs 1.4?

Nick
2015년 6월 27일 (토) 오전 2:51, Axel Dahl 님이 작성:

> still feels like a bug to have to create unique names before a join.
>
> On Fri, Jun 26, 2015 at 9:51 PM, ayan guha  wrote:
>
>> You can declare the schema with unique names before creation of df.
>> On 27 Jun 2015 13:01, "Axel Dahl"  wrote:
>>
>>>
>>> I have the following code:
>>>
>>> from pyspark import SQLContext
>>>
>>> d1 = [{'name':'bob', 'country': 'usa', 'age': 1}, {'name':'alice',
>>> 'country': 'jpn', 'age': 2}, {'name':'carol', 'country': 'ire', 'age': 3}]
>>> d2 = [{'name':'bob', 'country': 'usa', 'colour':'red'}, {'name':'alice',
>>> 'country': 'ire', 'colour':'green'}]
>>>
>>> r1 = sc.parallelize(d1)
>>> r2 = sc.parallelize(d2)
>>>
>>> sqlContext = SQLContext(sc)
>>> df1 = sqlContext.createDataFrame(d1)
>>> df2 = sqlContext.createDataFrame(d2)
>>> df1.join(df2, df1.name == df2.name and df1.country == df2.country,
>>> 'left_outer').collect()
>>>
>>>
>>> When I run it I get the following, (notice in the first row, all join
>>> keys are take from the right-side and so are blanked out):
>>>
>>> [Row(age=2, country=None, name=None, colour=None, country=None,
>>> name=None),
>>> Row(age=1, country=u'usa', name=u'bob', colour=u'red', country=u'usa',
>>> name=u'bob'),
>>> Row(age=3, country=u'ire', name=u'alice', colour=u'green',
>>> country=u'ire', name=u'alice')]
>>>
>>> I would expect to get (though ideally without duplicate columns):
>>> [Row(age=2, country=u'ire', name=u'Alice', colour=None, country=None,
>>> name=None),
>>> Row(age=1, country=u'usa', name=u'bob', colour=u'red', country=u'usa',
>>> name=u'bob'),
>>> Row(age=3, country=u'ire', name=u'alice', colour=u'green',
>>> country=u'ire', name=u'alice')]
>>>
>>> The workaround for now is this rather clunky piece of code:
>>> df2 = sqlContext.createDataFrame(d2).withColumnRenamed('name',
>>> 'name2').withColumnRenamed('country', 'country2')
>>> df1.join(df2, df1.name == df2.name2 and df1.country == df2.country2,
>>> 'left_outer').collect()
>>>
>>> So to me it looks like a bug, but am I doing something wrong?
>>>
>>> Thanks,
>>>
>>> -Axel
>>>
>>>
>>>
>>>
>>>
>


Re: dataframe left joins are not working as expected in pyspark

2015-06-27 Thread Axel Dahl
I've only tested on 1.4, but imagine 1.3 is the same or a lot of people's
code would be failing right now.

On Saturday, June 27, 2015, Nicholas Chammas 
wrote:

> Yeah, you shouldn't have to rename the columns before joining them.
>
> Do you see the same behavior on 1.3 vs 1.4?
>
> Nick
> 2015년 6월 27일 (토) 오전 2:51, Axel Dahl  >님이 작성:
>
>> still feels like a bug to have to create unique names before a join.
>>
>> On Fri, Jun 26, 2015 at 9:51 PM, ayan guha > > wrote:
>>
>>> You can declare the schema with unique names before creation of df.
>>> On 27 Jun 2015 13:01, "Axel Dahl" >> > wrote:
>>>

 I have the following code:

 from pyspark import SQLContext

 d1 = [{'name':'bob', 'country': 'usa', 'age': 1}, {'name':'alice',
 'country': 'jpn', 'age': 2}, {'name':'carol', 'country': 'ire', 'age': 3}]
 d2 = [{'name':'bob', 'country': 'usa', 'colour':'red'},
 {'name':'alice', 'country': 'ire', 'colour':'green'}]

 r1 = sc.parallelize(d1)
 r2 = sc.parallelize(d2)

 sqlContext = SQLContext(sc)
 df1 = sqlContext.createDataFrame(d1)
 df2 = sqlContext.createDataFrame(d2)
 df1.join(df2, df1.name == df2.name and df1.country == df2.country,
 'left_outer').collect()


 When I run it I get the following, (notice in the first row, all join
 keys are take from the right-side and so are blanked out):

 [Row(age=2, country=None, name=None, colour=None, country=None,
 name=None),
 Row(age=1, country=u'usa', name=u'bob', colour=u'red', country=u'usa',
 name=u'bob'),
 Row(age=3, country=u'ire', name=u'alice', colour=u'green',
 country=u'ire', name=u'alice')]

 I would expect to get (though ideally without duplicate columns):
 [Row(age=2, country=u'ire', name=u'Alice', colour=None, country=None,
 name=None),
 Row(age=1, country=u'usa', name=u'bob', colour=u'red', country=u'usa',
 name=u'bob'),
 Row(age=3, country=u'ire', name=u'alice', colour=u'green',
 country=u'ire', name=u'alice')]

 The workaround for now is this rather clunky piece of code:
 df2 = sqlContext.createDataFrame(d2).withColumnRenamed('name',
 'name2').withColumnRenamed('country', 'country2')
 df1.join(df2, df1.name == df2.name2 and df1.country == df2.country2,
 'left_outer').collect()

 So to me it looks like a bug, but am I doing something wrong?

 Thanks,

 -Axel





>>


Re: How to timeout a task?

2015-06-27 Thread Ted Yu
Have you looked at:
http://stackoverflow.com/questions/2281850/timeout-function-if-it-takes-too-long-to-finish

FYI

On Sat, Jun 27, 2015 at 8:33 AM, wasauce  wrote:

> Hello!
>
> We use pyspark to run a set of data extractors (think regex). The
> extractors
> (regexes) generally run quite quickly and find a few matches which are
> returned and stored into a database.
>
> My question is -- is it possible to make the function that runs the
> extractors have a timeout? I.E. if for a given file the extractor runs for
> more than X seconds it terminates and returns a default value?
>
> Here is a code snippet of what we are doing with some comments as to which
> function I am looking to timeout.
>
> code: https://gist.github.com/wasauce/42a956a1371a2b564918
>
> Thank you
>
> - Bill
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-timeout-a-task-tp23513.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Matrix Multiplication and mllib.recommendation

2015-06-27 Thread Ayman Farahat
How do you partition by product in Python?
the only API is partitionBy(50)

On Jun 18, 2015, at 8:42 AM, Debasish Das  wrote:

> Also in my experiments, it's much faster to blocked BLAS through cartesian 
> rather than doing sc.union. Here are the details on the experiments:
> 
> https://issues.apache.org/jira/browse/SPARK-4823
> 
> On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das  
> wrote:
> Also not sure how threading helps here because Spark puts a partition to each 
> core. On each core may be there are multiple threads if you are using intel 
> hyperthreading but I will let Spark handle the threading.  
> 
> On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das  
> wrote:
> We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS dgemm 
> based calculation.
> 
> On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
>  wrote:
> Thanks Sabarish and Nick
> Would you happen to have some code snippets that you can share. 
> Best
> Ayman
> 
> On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
>  wrote:
> 
>> Nick is right. I too have implemented this way and it works just fine. In my 
>> case, there can be even more products. You simply broadcast blocks of 
>> products to userFeatures.mapPartitions() and BLAS multiply in there to get 
>> recommendations. In my case 10K products form one block. Note that you would 
>> then have to union your recommendations. And if there lots of product 
>> blocks, you might also want to checkpoint once every few times.
>> 
>> Regards
>> Sab
>> 
>> On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath  
>> wrote:
>> One issue is that you broadcast the product vectors and then do a dot 
>> product one-by-one with the user vector.
>> 
>> You should try forming a matrix of the item vectors and doing the dot 
>> product as a matrix-vector multiply which will make things a lot faster.
>> 
>> Another optimisation that is avalailable on 1.4 is a recommendProducts 
>> method that blockifies the factors to make use of level 3 BLAS (ie 
>> matrix-matrix multiply). I am not sure if this is available in The Python 
>> api yet. 
>> 
>> But you can do a version yourself by using mapPartitions over user factors, 
>> blocking the factors into sub-matrices and doing matrix multiply with item 
>> factor matrix to get scores on a block-by-block basis.
>> 
>> Also as Ilya says more parallelism can help. I don't think it's so necessary 
>> to do LSH with 30,000 items.
>> 
>> —
>> Sent from Mailbox
>> 
>> 
>> On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya  
>> wrote:
>> 
>> Actually talk about this exact thing in a blog post here 
>> http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
>>  Keep in mind, you're actually doing a ton of math. Even with proper caching 
>> and use of broadcast variables this will take a while defending on the size 
>> of your cluster. To get real results you may want to look into locality 
>> sensitive hashing to limit your search space and definitely look into 
>> spinning up multiple threads to process your product features in parallel to 
>> increase resource utilization on the cluster.
>> 
>> 
>> 
>> Thank you,
>> Ilya Ganelin
>> 
>> 
>> 
>> -Original Message-
>> From: afarahat [ayman.fara...@yahoo.com]
>> Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
>> To: user@spark.apache.org
>> Subject: Matrix Multiplication and mllib.recommendation
>> 
>> Hello;
>> I am trying to get predictions after running the ALS model.
>> The model works fine. In the prediction/recommendation , I have about 30
>> ,000 products and 90 Millions users.
>> When i try the predict all it fails.
>> I have been trying to formulate the problem as a Matrix multiplication where
>> I first get the product features, broadcast them and then do a dot product.
>> Its still very slow. Any reason why
>> here is a sample code
>> 
>> def doMultiply(x):
>> a = []
>> #multiply by
>> mylen = len(pf.value)
>> for i in range(mylen) :
>>   myprod = numpy.dot(x,pf.value[i][1])
>>   a.append(myprod)
>> return a
>> 
>> 
>> myModel = MatrixFactorizationModel.load(sc, "FlurryModelPath")
>> #I need to select which products to broadcast but lets try all
>> m1 = myModel.productFeatures().sample(False, 0.001)
>> pf = sc.broadcast(m1.collect())
>> uf = myModel.userFeatures()
>> f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 
>> 
>> The information contained in this e-mail is confidential and/or proprietary 
>> to Capital One and/or its af

Re: dataframe left joins are not working as expected in pyspark

2015-06-27 Thread Nicholas Chammas
I would test it against 1.3 to be sure, because it could -- though unlikely
-- be a regression. For example, I recently stumbled upon this issue
 which was specific to
1.4.

On Sat, Jun 27, 2015 at 12:28 PM Axel Dahl  wrote:

> I've only tested on 1.4, but imagine 1.3 is the same or a lot of people's
> code would be failing right now.
>
> On Saturday, June 27, 2015, Nicholas Chammas 
> wrote:
>
>> Yeah, you shouldn't have to rename the columns before joining them.
>>
>> Do you see the same behavior on 1.3 vs 1.4?
>>
>> Nick
>> 2015년 6월 27일 (토) 오전 2:51, Axel Dahl 님이 작성:
>>
>>> still feels like a bug to have to create unique names before a join.
>>>
>>> On Fri, Jun 26, 2015 at 9:51 PM, ayan guha  wrote:
>>>
 You can declare the schema with unique names before creation of df.
 On 27 Jun 2015 13:01, "Axel Dahl"  wrote:

>
> I have the following code:
>
> from pyspark import SQLContext
>
> d1 = [{'name':'bob', 'country': 'usa', 'age': 1}, {'name':'alice',
> 'country': 'jpn', 'age': 2}, {'name':'carol', 'country': 'ire', 'age': 3}]
> d2 = [{'name':'bob', 'country': 'usa', 'colour':'red'},
> {'name':'alice', 'country': 'ire', 'colour':'green'}]
>
> r1 = sc.parallelize(d1)
> r2 = sc.parallelize(d2)
>
> sqlContext = SQLContext(sc)
> df1 = sqlContext.createDataFrame(d1)
> df2 = sqlContext.createDataFrame(d2)
> df1.join(df2, df1.name == df2.name and df1.country == df2.country,
> 'left_outer').collect()
>
>
> When I run it I get the following, (notice in the first row, all join
> keys are take from the right-side and so are blanked out):
>
> [Row(age=2, country=None, name=None, colour=None, country=None,
> name=None),
> Row(age=1, country=u'usa', name=u'bob', colour=u'red', country=u'usa',
> name=u'bob'),
> Row(age=3, country=u'ire', name=u'alice', colour=u'green',
> country=u'ire', name=u'alice')]
>
> I would expect to get (though ideally without duplicate columns):
> [Row(age=2, country=u'ire', name=u'Alice', colour=None, country=None,
> name=None),
> Row(age=1, country=u'usa', name=u'bob', colour=u'red', country=u'usa',
> name=u'bob'),
> Row(age=3, country=u'ire', name=u'alice', colour=u'green',
> country=u'ire', name=u'alice')]
>
> The workaround for now is this rather clunky piece of code:
> df2 = sqlContext.createDataFrame(d2).withColumnRenamed('name',
> 'name2').withColumnRenamed('country', 'country2')
> df1.join(df2, df1.name == df2.name2 and df1.country == df2.country2,
> 'left_outer').collect()
>
> So to me it looks like a bug, but am I doing something wrong?
>
> Thanks,
>
> -Axel
>
>
>
>
>
>>>


Re: Spark 1.4.0 - Using SparkR on EC2 Instance

2015-06-27 Thread RedOakMark
For anyone monitoring the thread, I was able to successfully install and run
a small Spark cluster and model using this method:

First, make sure that the username being used to login to RStudio Server is
the one that was used to install Spark on the EC2 instance.  Thanks to
Shivaram for his help here.  

Login to RStudio and ensure that these references are used - set the library
location to the folder where spark is installed.  In my case,
~/home/rstudio/spark.

# # This line loads SparkR (the R package) from the installed directory
library("SparkR", lib.loc="./spark/R/lib")

The edits to this line were important, so that Spark knew where the install
folder was located when initializing the cluster.

# Initialize the Spark local cluster in R, as ‘sc’
sc <- sparkR.init("local[2]", "SparkR", "./spark")

>From here, we ran a basic model using Spark, from RStudio, which ran
successfully. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-Using-SparkR-on-EC2-Instance-tp23506p23514.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dataframe left joins are not working as expected in pyspark

2015-06-27 Thread Yin Huai
Axel,

Can you file a jira and attach your code in the description of the jira?
This looks like a bug.

For the third row of df1, the name is "alice" instead of "carol", right?
Otherwise, "carol" should appear in the expected output.

Btw, to get rid of those columns with the same name after the join, you can
use select to pick columns you want to include in the results.

Thanks,

Yin

On Sat, Jun 27, 2015 at 11:29 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> I would test it against 1.3 to be sure, because it could -- though
> unlikely -- be a regression. For example, I recently stumbled upon this
> issue  which was
> specific to 1.4.
>
> On Sat, Jun 27, 2015 at 12:28 PM Axel Dahl  wrote:
>
>> I've only tested on 1.4, but imagine 1.3 is the same or a lot of people's
>> code would be failing right now.
>>
>> On Saturday, June 27, 2015, Nicholas Chammas 
>> wrote:
>>
>>> Yeah, you shouldn't have to rename the columns before joining them.
>>>
>>> Do you see the same behavior on 1.3 vs 1.4?
>>>
>>> Nick
>>> 2015년 6월 27일 (토) 오전 2:51, Axel Dahl 님이 작성:
>>>
 still feels like a bug to have to create unique names before a join.

 On Fri, Jun 26, 2015 at 9:51 PM, ayan guha  wrote:

> You can declare the schema with unique names before creation of df.
> On 27 Jun 2015 13:01, "Axel Dahl"  wrote:
>
>>
>> I have the following code:
>>
>> from pyspark import SQLContext
>>
>> d1 = [{'name':'bob', 'country': 'usa', 'age': 1}, {'name':'alice',
>> 'country': 'jpn', 'age': 2}, {'name':'carol', 'country': 'ire', 'age': 
>> 3}]
>> d2 = [{'name':'bob', 'country': 'usa', 'colour':'red'},
>> {'name':'alice', 'country': 'ire', 'colour':'green'}]
>>
>> r1 = sc.parallelize(d1)
>> r2 = sc.parallelize(d2)
>>
>> sqlContext = SQLContext(sc)
>> df1 = sqlContext.createDataFrame(d1)
>> df2 = sqlContext.createDataFrame(d2)
>> df1.join(df2, df1.name == df2.name and df1.country == df2.country,
>> 'left_outer').collect()
>>
>>
>> When I run it I get the following, (notice in the first row, all join
>> keys are take from the right-side and so are blanked out):
>>
>> [Row(age=2, country=None, name=None, colour=None, country=None,
>> name=None),
>> Row(age=1, country=u'usa', name=u'bob', colour=u'red',
>> country=u'usa', name=u'bob'),
>> Row(age=3, country=u'ire', name=u'alice', colour=u'green',
>> country=u'ire', name=u'alice')]
>>
>> I would expect to get (though ideally without duplicate columns):
>> [Row(age=2, country=u'ire', name=u'Alice', colour=None, country=None,
>> name=None),
>> Row(age=1, country=u'usa', name=u'bob', colour=u'red',
>> country=u'usa', name=u'bob'),
>> Row(age=3, country=u'ire', name=u'alice', colour=u'green',
>> country=u'ire', name=u'alice')]
>>
>> The workaround for now is this rather clunky piece of code:
>> df2 = sqlContext.createDataFrame(d2).withColumnRenamed('name',
>> 'name2').withColumnRenamed('country', 'country2')
>> df1.join(df2, df1.name == df2.name2 and df1.country == df2.country2,
>> 'left_outer').collect()
>>
>> So to me it looks like a bug, but am I doing something wrong?
>>
>> Thanks,
>>
>> -Axel
>>
>>
>>
>>
>>



rdd.saveAsSequenceFile(path)

2015-06-27 Thread Pat Ferrel
Our project is having a hard time following what we are supposed to do to 
migrate this function from Spark 1.2 to 1.3.

  /**
   * Dump matrix as computed Mahout's DRM into specified (HD)FS path
   * @param path
   */
  def dfsWrite(path: String) = {
val ktag = implicitly[ClassTag[K]]
//val vtag = implicitly[ClassTag[Vector]]

implicit val k2wFunc: (K) => Writable =
  if (ktag.runtimeClass == classOf[Int]) (x: K) => new 
IntWritable(x.asInstanceOf[Int])
  else if (ktag.runtimeClass == classOf[String]) (x: K) => new 
Text(x.asInstanceOf[String])
  else if (ktag.runtimeClass == classOf[Long]) (x: K) => new 
LongWritable(x.asInstanceOf[Long])
  else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => 
x.asInstanceOf[Writable]
  else throw new IllegalArgumentException("Do not know how to convert class 
tag %s to Writable.".format(ktag))

// the problem is here =
// this worked in Spark 1.2 and as we understand things should in 1.3 if we 
have the right implicits
//  rdd.saveAsSequenceFile(path)

// this works in Spark 1.3 but uses a deprecated method
SparkContext.rddToSequenceFileRDDFunctions(rdd.asInstanceOf[RDD[(K, 
Vector)]]).saveAsSequenceFile(path)
  }

As we understand it, we need to supply implicit writeable factories now instead 
of writables? The rdd is a sequence of key = one of the classes above, value = 
a Mahout “Vector". These are usually serialized through Kryo (not 
JavaSerializer) for closures so we have compatible classes for that. 

Any pointers would be helpful.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4.0 - Using SparkR on EC2 Instance

2015-06-27 Thread Shivaram Venkataraman
Thanks Mark for the update. For those interested Vincent Warmerdam also has
some details on making the /root/spark installation work at
https://issues.apache.org/jira/browse/SPARK-8596?focusedCommentId=14604328&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14604328

Shivaram

On Sat, Jun 27, 2015 at 12:23 PM, RedOakMark 
wrote:

> For anyone monitoring the thread, I was able to successfully install and
> run
> a small Spark cluster and model using this method:
>
> First, make sure that the username being used to login to RStudio Server is
> the one that was used to install Spark on the EC2 instance.  Thanks to
> Shivaram for his help here.
>
> Login to RStudio and ensure that these references are used - set the
> library
> location to the folder where spark is installed.  In my case,
> ~/home/rstudio/spark.
>
> # # This line loads SparkR (the R package) from the installed directory
> library("SparkR", lib.loc="./spark/R/lib")
>
> The edits to this line were important, so that Spark knew where the install
> folder was located when initializing the cluster.
>
> # Initialize the Spark local cluster in R, as ‘sc’
> sc <- sparkR.init("local[2]", "SparkR", "./spark")
>
> From here, we ran a basic model using Spark, from RStudio, which ran
> successfully.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-Using-SparkR-on-EC2-Instance-tp23506p23514.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark-Submit / Spark-Shell Error Standalone cluster

2015-06-27 Thread Ashish Soni
Not sure what is the issue but when i run the spark-submit or spark-shell i
am getting below error

/usr/bin/spark-class: line 24: /usr/bin/load-spark-env.sh: No such file or
directory

Can some one please help

Thanks,


Re: dataframe left joins are not working as expected in pyspark

2015-06-27 Thread Axel Dahl
created as SPARK-8685

https://issues.apache.org/jira/browse/SPARK-8685

@Yin, thx, have fixed sample code with the correct names.

On Sat, Jun 27, 2015 at 1:56 PM, Yin Huai  wrote:

> Axel,
>
> Can you file a jira and attach your code in the description of the jira?
> This looks like a bug.
>
> For the third row of df1, the name is "alice" instead of "carol", right?
> Otherwise, "carol" should appear in the expected output.
>
> Btw, to get rid of those columns with the same name after the join, you
> can use select to pick columns you want to include in the results.
>
> Thanks,
>
> Yin
>
> On Sat, Jun 27, 2015 at 11:29 AM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> I would test it against 1.3 to be sure, because it could -- though
>> unlikely -- be a regression. For example, I recently stumbled upon this
>> issue  which was
>> specific to 1.4.
>>
>> On Sat, Jun 27, 2015 at 12:28 PM Axel Dahl 
>> wrote:
>>
>>> I've only tested on 1.4, but imagine 1.3 is the same or a lot of
>>> people's code would be failing right now.
>>>
>>> On Saturday, June 27, 2015, Nicholas Chammas 
>>> wrote:
>>>
 Yeah, you shouldn't have to rename the columns before joining them.

 Do you see the same behavior on 1.3 vs 1.4?

 Nick
 2015년 6월 27일 (토) 오전 2:51, Axel Dahl 님이 작성:

> still feels like a bug to have to create unique names before a join.
>
> On Fri, Jun 26, 2015 at 9:51 PM, ayan guha 
> wrote:
>
>> You can declare the schema with unique names before creation of df.
>> On 27 Jun 2015 13:01, "Axel Dahl"  wrote:
>>
>>>
>>> I have the following code:
>>>
>>> from pyspark import SQLContext
>>>
>>> d1 = [{'name':'bob', 'country': 'usa', 'age': 1}, {'name':'alice',
>>> 'country': 'jpn', 'age': 2}, {'name':'carol', 'country': 'ire', 'age': 
>>> 3}]
>>> d2 = [{'name':'bob', 'country': 'usa', 'colour':'red'},
>>> {'name':'alice', 'country': 'ire', 'colour':'green'}]
>>>
>>> r1 = sc.parallelize(d1)
>>> r2 = sc.parallelize(d2)
>>>
>>> sqlContext = SQLContext(sc)
>>> df1 = sqlContext.createDataFrame(d1)
>>> df2 = sqlContext.createDataFrame(d2)
>>> df1.join(df2, df1.name == df2.name and df1.country == df2.country,
>>> 'left_outer').collect()
>>>
>>>
>>> When I run it I get the following, (notice in the first row, all
>>> join keys are take from the right-side and so are blanked out):
>>>
>>> [Row(age=2, country=None, name=None, colour=None, country=None,
>>> name=None),
>>> Row(age=1, country=u'usa', name=u'bob', colour=u'red',
>>> country=u'usa', name=u'bob'),
>>> Row(age=3, country=u'ire', name=u'alice', colour=u'green',
>>> country=u'ire', name=u'alice')]
>>>
>>> I would expect to get (though ideally without duplicate columns):
>>> [Row(age=2, country=u'ire', name=u'Alice', colour=None,
>>> country=None, name=None),
>>> Row(age=1, country=u'usa', name=u'bob', colour=u'red',
>>> country=u'usa', name=u'bob'),
>>> Row(age=3, country=u'ire', name=u'alice', colour=u'green',
>>> country=u'ire', name=u'alice')]
>>>
>>> The workaround for now is this rather clunky piece of code:
>>> df2 = sqlContext.createDataFrame(d2).withColumnRenamed('name',
>>> 'name2').withColumnRenamed('country', 'country2')
>>> df1.join(df2, df1.name == df2.name2 and df1.country ==
>>> df2.country2, 'left_outer').collect()
>>>
>>> So to me it looks like a bug, but am I doing something wrong?
>>>
>>> Thanks,
>>>
>>> -Axel
>>>
>>>
>>>
>>>
>>>
>
>


Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Ayman Farahat
Hello; 
I tried to adjust the number of blocks by repartitioning the input. 
Here is How I do it;  (I am partitioning by users )

tot = newrdd.map(lambda l: 
(l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
ratings = tot.values()
numIterations =8
rank = 80
model = ALS.trainImplicit(ratings, rank, numIterations)


I have 20 executors
with 5GM memory per executor. 
When i use 80 factors I keep getting the following problem :

Traceback (most recent call last):
  File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
model = ALS.trainImplicit(ratings, rank, numIterations)
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
 line 201, in trainImplicit
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
 line 128, in callMLlibFunc
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
 line 121, in callJavaFunc
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
 line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
o113.trainImplicitALSModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in 
stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage 36.1 
(TID 1841, gsbl52746.blue.ygrid.yahoo.com): java.io.FileNotFoundException: 
/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
 (No such file or directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.(RandomAccessFile.java:233)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
at 
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Jun 28, 2015 2:10:37 AM INFO: parquet.hadoop.ParquetFileReader: Initiating 
action with parallelism: 5
~   

On Jun 26, 2015, at 12:33 PM, Xiangrui Meng  wrote:

> So you have 100 partitions (blocks). This might be too many for your dataset. 
> Try setting a smaller number of blocks, e.g., 32 or 64. When ALS starts 
> iterations, you can see the shuffle read/write size from the "stages" tab of 
> Spark WebUI. Vary number of blocks and check the numbers there. Kyro 
> serializer doesn't help much here. You can try disabling it (though I don't 
> think it caused the failure). -Xiangrui
> 
> On Fri, Jun 26, 2015 at 11:00 AM, Ayman Farahat 

Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Sabarish Sasidharan
Are you running on top of YARN? Plus pls provide your infrastructure
details.

Regards
Sab
On 28-Jun-2015 8:47 am, "Ayman Farahat" 
wrote:

> Hello;
> I tried to adjust the number of blocks by repartitioning the input.
> Here is How I do it;  (I am partitioning by users )
>
> tot = newrdd.map(lambda l:
> (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
> ratings = tot.values()
> numIterations =8
> rank = 80
> model = ALS.trainImplicit(ratings, rank, numIterations)
>
>
> I have 20 executors
> with 5GM memory per executor.
> When i use 80 factors I keep getting the following problem :
>
> Traceback (most recent call last):
>   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
> model = ALS.trainImplicit(ratings, rank, numIterations)
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
> line 201, in trainImplicit
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
> line 128, in callMLlibFunc
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
> line 121, in callJavaFunc
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o113.trainImplicitALSModel.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 7 in stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage
> 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com):
> java.io.FileNotFoundException:
> /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
> (No such file or directory)
> at java.io.RandomAccessFile.open(Native Method)
> at java.io.RandomAccessFile.(RandomAccessFile.java:233)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
> at
> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
> at
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Jun 28, 2015 2:10:37 AM INFO: parquet.hadoop.ParquetFileReader: Initiating
> action with parallelism: 5
> ~
>
> On Jun 26, 2015, at 12:33 PM, Xiangrui Meng  wrote:
>
> So you have 100 partitions (blocks). This might be too many for your
> dataset. Try setting a smaller number of blocks, e.g., 32 or 64. When ALS
> starts iterations, you can see the shuffle read/write size from the
> "stages" tab of Spark WebUI. Vary number of blocks and check the numbers
> there. Kyro serializer doesn't hel

Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Sabarish Sasidharan
Are you running on top of YARN? Plus pls provide your infrastructure
details.

Regards
Sab
On 28-Jun-2015 9:20 am, "Sabarish Sasidharan" <
sabarish.sasidha...@manthan.com> wrote:

> Are you running on top of YARN? Plus pls provide your infrastructure
> details.
>
> Regards
> Sab
> On 28-Jun-2015 8:47 am, "Ayman Farahat" 
> wrote:
>
>> Hello;
>> I tried to adjust the number of blocks by repartitioning the input.
>> Here is How I do it;  (I am partitioning by users )
>>
>> tot = newrdd.map(lambda l:
>> (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
>> ratings = tot.values()
>> numIterations =8
>> rank = 80
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>
>>
>> I have 20 executors
>> with 5GM memory per executor.
>> When i use 80 factors I keep getting the following problem :
>>
>> Traceback (most recent call last):
>>   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
>> line 201, in trainImplicit
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 128, in callMLlibFunc
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 121, in callJavaFunc
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o113.trainImplicitALSModel.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 7 in stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage
>> 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com):
>> java.io.FileNotFoundException:
>> /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
>> (No such file or directory)
>> at java.io.RandomAccessFile.open(Native Method)
>> at java.io.RandomAccessFile.(RandomAccessFile.java:233)
>> at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
>> at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>> at
>> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
>> at
>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> Jun 28, 2015 2:10:37 AM INFO: parquet.hadoop.ParquetFileReader:
>> Initiating action with parallelism: 5
>> ~
>>
>> On Jun 26, 2015, at 12:33 PM, Xiangrui Meng  wrote:
>>
>> So you have 100 partition

Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Ayman Farahat
That's correct this is Yarn
And spark 1.4
Also using the Anaconda tar for Numpy and other Libs


Sent from my iPhone

> On Jun 27, 2015, at 8:50 PM, Sabarish Sasidharan 
>  wrote:
> 
> Are you running on top of YARN? Plus pls provide your infrastructure details.
> 
> Regards
> Sab
> 
>> On 28-Jun-2015 8:47 am, "Ayman Farahat"  
>> wrote:
>> Hello; 
>> I tried to adjust the number of blocks by repartitioning the input. 
>> Here is How I do it;  (I am partitioning by users )
>> 
>> tot = newrdd.map(lambda l: 
>> (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
>> ratings = tot.values()
>> numIterations =8
>> rank = 80
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>> 
>> 
>> I have 20 executors
>> with 5GM memory per executor. 
>> When i use 80 factors I keep getting the following problem :
>> 
>> Traceback (most recent call last):
>>   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>   File 
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
>>  line 201, in trainImplicit
>>   File 
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>>  line 128, in callMLlibFunc
>>   File 
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>>  line 121, in callJavaFunc
>>   File 
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>>  line 538, in __call__
>>   File 
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>>  line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling 
>> o113.trainImplicitALSModel.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 
>> in stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage 
>> 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com): 
>> java.io.FileNotFoundException: 
>> /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
>>  (No such file or directory)
>> at java.io.RandomAccessFile.open(Native Method)
>> at java.io.RandomAccessFile.(RandomAccessFile.java:233)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>> at 
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>> at 
>> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
>> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>> 
>> Driver stacktrace:
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>> at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at scala.Option.foreach(Option.scala:236)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> 
>> Jun 28, 2015 2:10:37 AM INFO: parquet.hadoop.ParquetFileReader: Initiating 
>> action with parallelism: 5
>> ~

Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Sabarish Sasidharan
Try setting the yarn executor memory overhead to a higher value like 1g or
1.5g or more.

Regards
Sab
On 28-Jun-2015 9:22 am, "Ayman Farahat"  wrote:

> That's correct this is Yarn
> And spark 1.4
> Also using the Anaconda tar for Numpy and other Libs
>
>
> Sent from my iPhone
>
> On Jun 27, 2015, at 8:50 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
> Are you running on top of YARN? Plus pls provide your infrastructure
> details.
>
> Regards
> Sab
> On 28-Jun-2015 8:47 am, "Ayman Farahat" 
> wrote:
>
>> Hello;
>> I tried to adjust the number of blocks by repartitioning the input.
>> Here is How I do it;  (I am partitioning by users )
>>
>> tot = newrdd.map(lambda l:
>> (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
>> ratings = tot.values()
>> numIterations =8
>> rank = 80
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>
>>
>> I have 20 executors
>> with 5GM memory per executor.
>> When i use 80 factors I keep getting the following problem :
>>
>> Traceback (most recent call last):
>>   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
>> line 201, in trainImplicit
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 128, in callMLlibFunc
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 121, in callJavaFunc
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File
>> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o113.trainImplicitALSModel.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 7 in stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage
>> 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com):
>> java.io.FileNotFoundException:
>> /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
>> (No such file or directory)
>> at java.io.RandomAccessFile.open(Native Method)
>> at java.io.RandomAccessFile.(RandomAccessFile.java:233)
>> at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
>> at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>> at
>> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
>> at
>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> Ju

Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition

2015-06-27 Thread Ayman Farahat
Where do I do that ? 
Thanks 

Sent from my iPhone

> On Jun 27, 2015, at 8:59 PM, Sabarish Sasidharan 
>  wrote:
> 
> Try setting the yarn executor memory overhead to a higher value like 1g or 
> 1.5g or more.
> 
> Regards
> Sab
> 
>> On 28-Jun-2015 9:22 am, "Ayman Farahat"  wrote:
>> That's correct this is Yarn
>> And spark 1.4
>> Also using the Anaconda tar for Numpy and other Libs
>> 
>> 
>> Sent from my iPhone
>> 
>>> On Jun 27, 2015, at 8:50 PM, Sabarish Sasidharan 
>>>  wrote:
>>> 
>>> Are you running on top of YARN? Plus pls provide your infrastructure 
>>> details.
>>> 
>>> Regards
>>> Sab
>>> 
 On 28-Jun-2015 8:47 am, "Ayman Farahat"  
 wrote:
 Hello; 
 I tried to adjust the number of blocks by repartitioning the input. 
 Here is How I do it;  (I am partitioning by users )
 
 tot = newrdd.map(lambda l: 
 (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
 ratings = tot.values()
 numIterations =8
 rank = 80
 model = ALS.trainImplicit(ratings, rank, numIterations)
 
 
 I have 20 executors
 with 5GM memory per executor. 
 When i use 80 factors I keep getting the following problem :
 
 Traceback (most recent call last):
   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in 
 model = ALS.trainImplicit(ratings, rank, numIterations)
   File 
 "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
  line 201, in trainImplicit
   File 
 "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
  line 128, in callMLlibFunc
   File 
 "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
  line 121, in callJavaFunc
   File 
 "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
  line 538, in __call__
   File 
 "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
  line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling 
 o113.trainImplicitALSModel.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
 7 in stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in 
 stage 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com): 
 java.io.FileNotFoundException: 
 /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
  (No such file or directory)
 at java.io.RandomAccessFile.open(Native Method)
 at java.io.RandomAccessFile.(RandomAccessFile.java:233)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
 at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
 at 
 org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
 at 
 org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
 at 
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:722)
 
 Driver stacktrace:
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
 at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at 
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at 
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at scala.Option.foreach(Option.scala:236)
 at 
 org.apache.spark.scheduler.DAGSched