Checkpointing calls the job twice?

2015-10-17 Thread jatinganhotra
Hi,

I noticed that when you checkpoint a given RDD, it results in performing the
action twice as I can see 2 jobs being executed in the Spark UI.

Example:
val logFile = "/data/pagecounts"
sc.setCheckpointDir("/checkpoints")
val logData = sc.textFile(logFile, 2)
val as = logData.filter(line => line.contains("a"))

Scenario #1:
as.count()// Only 1 job.

But, if I change the above code to below:

Scenario #2:
as.cache()
as.checkpoint()
as.count()

Here, there are 2 jobs being executed as shown in the Spark UI, with
duration 0.9s and 0.4s

Why are there 2 jobs in scenario #2? In Spark source code, the comment for
RDD.checkpoint() says the following - 
"This function must be called before any job has been executed on this RDD.
It is strongly recommended that this RDD is persisted in memory, otherwise
saving it on a file will require recompilation."

In my example above, I am calling cache() before checkpoint(), so RDD will
be persisted in memory. Also, both of the above calls are before the count()
action, so checkpoint() is called before any job execution.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Checkpointing-calls-the-job-twice-tp25110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HBase Spark Streaming giving error after restore

2015-10-17 Thread Amit Hora
Hi,

Regresta for delayed resoonse
please find below full stack trace

ava.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
org.apache.hadoop.hbase.client.Mutation
at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/10/16 18:50:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
1, localhost, ANY, 1185 bytes)
15/10/16 18:50:03 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/10/16 18:50:03 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
localhost): java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be
cast to org.apache.hadoop.hbase.client.Mutation
at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/10/16 18:50:03 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times;
aborting job
15/10/16 18:50:03 INFO TaskSchedulerImpl: Cancelling stage 0
15/10/16 18:50:03 INFO Executor: Executor is trying to kill task 1.0 in
stage 0.0 (TID 1)
15/10/16 18:50:03 INFO TaskSchedulerImpl: Stage 0 was cancelled
15/10/16 18:50:03 INFO DAGScheduler: Job 0 failed: foreachRDD at
TwitterStream.scala:150, took 5.956054 s
15/10/16 18:50:03 INFO JobScheduler: Starting job streaming job
144500141 ms.0 from job set of time 144500141 ms
15/10/16 18:50:03 ERROR JobScheduler: Error running job streaming job
144500140 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost): java.lang.ClassCastException:
scala.runtime.BoxedUnit cannot be cast to
org.apache.hadoop.hbase.client.Mutation
at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 1 times, most recent 

Re: HBase Spark Streaming giving error after restore

2015-10-17 Thread Aniket Bhatnagar
Can you try changing classOf[OutputFormat[String,
BoxedUnit]] to classOf[OutputFormat[String,
Put]] while configuring hconf?

On Sat, Oct 17, 2015, 11:44 AM Amit Hora  wrote:

> Hi,
>
> Regresta for delayed resoonse
> please find below full stack trace
>
> ava.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/10/16 18:50:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 1, localhost, ANY, 1185 bytes)
> 15/10/16 18:50:03 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 15/10/16 18:50:03 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be
> cast to org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/10/16 18:50:03 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1
> times; aborting job
> 15/10/16 18:50:03 INFO TaskSchedulerImpl: Cancelling stage 0
> 15/10/16 18:50:03 INFO Executor: Executor is trying to kill task 1.0 in
> stage 0.0 (TID 1)
> 15/10/16 18:50:03 INFO TaskSchedulerImpl: Stage 0 was cancelled
> 15/10/16 18:50:03 INFO DAGScheduler: Job 0 failed: foreachRDD at
> TwitterStream.scala:150, took 5.956054 s
> 15/10/16 18:50:03 INFO JobScheduler: Starting job streaming job
> 144500141 ms.0 from job set of time 144500141 ms
> 15/10/16 18:50:03 ERROR JobScheduler: Error running job streaming job
> 144500140 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): java.lang.ClassCastException:
> scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
> at
> 

repartition vs partitionby

2015-10-17 Thread shahid qadri
Hi folks

I need to reparation large set of data around(300G) as i see some portions have 
large data(data skew)

i have pairRDDs [({},{}),({},{}),({},{})]

what is the best way to solve the the problem
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark: breakdown application execution time and fine-tuning

2015-10-17 Thread saluc
Hello,

I am using PySpark to develop my big-data application. I have the impression
that most of the execution of my application is spent on the infrastructure
(distributing the code and the data in the cluster, IPC between the Python
processes and the JVM) rather than on the  computation itself. I would be
interested in particular in measuring the time spent in the IPC between the
Python processes and the JVM.

I would like to ask you, is there a way to breakdown the execution time in
order to have more details on how much time is effectively spent on the
different phases of the execution, so to have some kind of detailed
profiling of the execution time, and have more information for fine-tuning
the application?

Thank you very much for your help and support,
Luca



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-breakdown-application-execution-time-and-fine-tuning-tp25105.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on Mesos / Executor Memory

2015-10-17 Thread Bharath Ravi Kumar
David, Tom,

Thanks for the explanation. This confirms my suspicion that the executor
was holding on to memory  regardless of tasks in execution once it expands
to occupy memory in keeping with spark.executor.memory. There certainly is
scope for improvement here, though I realize there will substantial
overheads in implementing memory release without compromising RDD caching
and similar aspects. I'll explore alternatives / workarounds meanwhile.

Thanks,
Bharath


On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld  wrote:

> Hi Bharath,
>
> When running jobs in fine grained mode, each Spark task is sent to mesos
> as a task which allows the offers system to maintain fairness between
> different spark application (as you've described). Having said that, unless
> your memory per-node is hugely undersubscribed when running these jobs in
> parallel. This behaviour matches exactly what you've described.
>
> What you're seeing happens because even though there's a new mesos task
> for each Spark task (allowing CPU to be shared) the Spark executors don't
> get killed even when they aren't doing any work, which means the memory
> isn't released. The JVM doesn't allow for flexible memory re-allocation (as
> far as i'm aware) which make it impossible for spark to dynamically scale
> up the memory of the executor over time as tasks start and finish.
>
> As Dave pointed out, the simplest way to solve this is to use a higher
> level tool that can run your spark jobs through one mesos framework and
> then you can let spark distribute the resources more effectively.
>
> I hope that helps!
>
> Tom.
>
> On 17 Oct 2015, at 06:47, Bharath Ravi Kumar  wrote:
>
> Can someone respond if you're aware of the reason for such a memory
> footprint? It seems unintuitive and hard to reason about.
>
> Thanks,
> Bharath
>
> On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar 
> wrote:
>
>> Resending since user@mesos bounced earlier. My apologies.
>>
>> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar > > wrote:
>>
>>> (Reviving this thread since I ran into similar issues...)
>>>
>>> I'm running two spark jobs (in mesos fine grained mode), each belonging
>>> to a different mesos role, say low and high. The low:high mesos weights are
>>> 1:10. On expected lines, I see that the low priority job occupies cluster
>>> resources to the maximum extent when running alone. However, when the high
>>> priority job is submitted, it does not start and continues to await cluster
>>> resources (as seen in the logs). Since the jobs run in fine grained mode
>>> and the low priority tasks begin to finish, the high priority job should
>>> ideally be able to start and gradually take over cluster resources as per
>>> the weights. However, I noticed that while the "low" job gives up CPU cores
>>> with each completing task (e.g. reduction from 72 -> 12 with default
>>> parallelism set to 72), the memory resources are held on (~500G out of
>>> 768G). The spark.executor.memory setting appears to directly impact the
>>> amount of memory that the job holds on to. In this case, it was set to 200G
>>> in the low priority task and 100G in the high priority task. The nature of
>>> these jobs is such that setting the numbers to smaller values (say 32g)
>>> resulted in job failures with outofmemoryerror.  It appears that the spark
>>> framework is retaining memory (across tasks)  proportional to
>>> spark.executor.memory for the duration of the job and not releasing memory
>>> as tasks complete. This defeats the purpose of fine grained mode execution
>>> as the memory occupancy is preventing the high priority job from accepting
>>> the prioritized cpu offers and beginning execution. Can this be explained /
>>> documented better please?
>>>
>>> Thanks,
>>> Bharath
>>>
>>> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen  wrote:
>>>
 (Adding spark user list)

 Hi Tom,

 If I understand correctly you're saying that you're running into memory
 problems because the scheduler is allocating too much CPUs and not enough
 memory to acoomodate them right?

 In the case of fine grain mode I don't think that's a problem since we
 have a fixed amount of CPU and memory per task.
 However, in coarse grain you can run into that problem if you're with
 in the spark.cores.max limit, and memory is a fixed number.

 I have a patch out to configure how much max cpus should coarse grain
 executor use, and it also allows multiple executors in coarse grain mode.
 So you could say try to launch multiples of max 4 cores with
 spark.executor.memory (+ overhead and etc) in a slave. (
 https://github.com/apache/spark/pull/4027)

 It also might be interesting to include a cores to memory multiplier so
 that with a larger amount of cores we try to scale the memory with some
 factor, but I'm not entirely sure that's 

can I use Spark as alternative for gem fire cache ?

2015-10-17 Thread kali.tumm...@gmail.com
Hi All, 

Can spark be used as an alternative to gem fire cache ? we use gem fire
cache to save (cache) dimension data in memory which is later used by our
Java custom made ETL tool can I do something like below ?

can I cache a RDD in memory for a whole day ? as of I know RDD will get
empty once the spark code finish executing (correct me if I am wrong).

Spark:- 
create a RDD 
rdd.persistance 

Thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-I-use-Spark-as-alternative-for-gem-fire-cache-tp25106.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to have Single refernce of a class in Spark Streaming?

2015-10-17 Thread Deenar Toraskar
Swetha

Look at
http://spark.apache.org/docs/latest/programming-guide.html#shared-variables



Normally, when a function passed to a Spark operation (such as map or reduce)
is executed on a remote cluster node, it works on separate copies of all
the variables used in the function. These variables are copied to each
machine, and no updates to the variables on the remote machine are
propagated back to the driver program. Supporting general, read-write
shared variables across tasks would be inefficient. However, Spark does
provide two limited types of *shared variables* for two common usage
patterns: broadcast variables and accumulators.


Deenar

On 17 October 2015 at 02:05, swetha  wrote:

> Hi,
>
> How to have a single reference of a class across all the executors in Spark
> Streaming? The contents of the class will be updated at all the executors.
> Would using it as a variable inside updateStateByKey guarantee that
> reference is updated across all the  executors and no
> concurrentModificationException? Following is how I am trying to use a
> Tracker Class across all the JVMs.
>
> val trackerClass = new TrackerClass();
>
>
> val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
>
> def updateFunction(newValues: Seq[Int], runningCount: Option[Int]):
> Option[Int] = {
> getMergedSession(this.trackerClass)
> Some(newCount)
> }
>
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Single-refernce-of-a-class-in-Spark-Streaming-tp25103.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark on Mesos / Executor Memory

2015-10-17 Thread Bharath Ravi Kumar
To be precise, the MesosExecutorBackend's Xms & Xmx equal
spark.executor.memory. So there's no question of expanding or contracting
the memory held by the executor.

On Sat, Oct 17, 2015 at 5:38 PM, Bharath Ravi Kumar 
wrote:

> David, Tom,
>
> Thanks for the explanation. This confirms my suspicion that the executor
> was holding on to memory  regardless of tasks in execution once it expands
> to occupy memory in keeping with spark.executor.memory. There certainly is
> scope for improvement here, though I realize there will substantial
> overheads in implementing memory release without compromising RDD caching
> and similar aspects. I'll explore alternatives / workarounds meanwhile.
>
> Thanks,
> Bharath
>
>
>
> On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld  wrote:
>
>> Hi Bharath,
>>
>> When running jobs in fine grained mode, each Spark task is sent to mesos
>> as a task which allows the offers system to maintain fairness between
>> different spark application (as you've described). Having said that, unless
>> your memory per-node is hugely undersubscribed when running these jobs in
>> parallel. This behaviour matches exactly what you've described.
>>
>> What you're seeing happens because even though there's a new mesos task
>> for each Spark task (allowing CPU to be shared) the Spark executors don't
>> get killed even when they aren't doing any work, which means the memory
>> isn't released. The JVM doesn't allow for flexible memory re-allocation (as
>> far as i'm aware) which make it impossible for spark to dynamically scale
>> up the memory of the executor over time as tasks start and finish.
>>
>> As Dave pointed out, the simplest way to solve this is to use a higher
>> level tool that can run your spark jobs through one mesos framework and
>> then you can let spark distribute the resources more effectively.
>>
>> I hope that helps!
>>
>> Tom.
>>
>> On 17 Oct 2015, at 06:47, Bharath Ravi Kumar  wrote:
>>
>> Can someone respond if you're aware of the reason for such a memory
>> footprint? It seems unintuitive and hard to reason about.
>>
>> Thanks,
>> Bharath
>>
>> On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar > > wrote:
>>
>>> Resending since user@mesos bounced earlier. My apologies.
>>>
>>> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <
>>> reachb...@gmail.com> wrote:
>>>
 (Reviving this thread since I ran into similar issues...)

 I'm running two spark jobs (in mesos fine grained mode), each belonging
 to a different mesos role, say low and high. The low:high mesos weights are
 1:10. On expected lines, I see that the low priority job occupies cluster
 resources to the maximum extent when running alone. However, when the high
 priority job is submitted, it does not start and continues to await cluster
 resources (as seen in the logs). Since the jobs run in fine grained mode
 and the low priority tasks begin to finish, the high priority job should
 ideally be able to start and gradually take over cluster resources as per
 the weights. However, I noticed that while the "low" job gives up CPU cores
 with each completing task (e.g. reduction from 72 -> 12 with default
 parallelism set to 72), the memory resources are held on (~500G out of
 768G). The spark.executor.memory setting appears to directly impact the
 amount of memory that the job holds on to. In this case, it was set to 200G
 in the low priority task and 100G in the high priority task. The nature of
 these jobs is such that setting the numbers to smaller values (say 32g)
 resulted in job failures with outofmemoryerror.  It appears that the spark
 framework is retaining memory (across tasks)  proportional to
 spark.executor.memory for the duration of the job and not releasing memory
 as tasks complete. This defeats the purpose of fine grained mode execution
 as the memory occupancy is preventing the high priority job from accepting
 the prioritized cpu offers and beginning execution. Can this be explained /
 documented better please?

 Thanks,
 Bharath

 On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen  wrote:

> (Adding spark user list)
>
> Hi Tom,
>
> If I understand correctly you're saying that you're running into
> memory problems because the scheduler is allocating too much CPUs and not
> enough memory to acoomodate them right?
>
> In the case of fine grain mode I don't think that's a problem since we
> have a fixed amount of CPU and memory per task.
> However, in coarse grain you can run into that problem if you're with
> in the spark.cores.max limit, and memory is a fixed number.
>
> I have a patch out to configure how much max cpus should coarse grain
> executor use, and it also allows multiple executors in coarse grain mode.
> So you could say try 

Re: repartition vs partitionby

2015-10-17 Thread Raghavendra Pandey
You can use coalesce function, if you want to reduce the number of
partitions. This one minimizes the data shuffle.

-Raghav

On Sat, Oct 17, 2015 at 1:02 PM, shahid qadri 
wrote:

> Hi folks
>
> I need to reparation large set of data around(300G) as i see some portions
> have large data(data skew)
>
> i have pairRDDs [({},{}),({},{}),({},{})]
>
> what is the best way to solve the the problem
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: s3a file system and spark deployment mode

2015-10-17 Thread Raghavendra Pandey
You can add classpath info in hadoop env file...

Add the following line to your $HADOOP_HOME/etc/hadoop/hadoop-env.sh
export
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/tools/lib/*

Add the following line to $SPARK_HOME/conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop --config
$HADOOP_HOME/etc/hadoop classpath)


This is how you set up hadoop 2.7.1 and spark 1.5.1 with no hadoop. This
will also put necessary jars to your classpath to access s3a.

Also, please note that you need to set fs.s3a.access.key
and fs.s3a.secret.key property into your core-site.xml, rather
than fs.s3a.awsSecretAccessKey and fs.s3a.awsAccessKeyId as mentioned in
the docs.

Good luck
-Raghav

On Fri, Oct 16, 2015 at 9:07 PM, Scott Reynolds 
wrote:

> hmm I tried using --jars and that got passed to MasterArguments and that
> doesn't work :-(
>
>
> https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
>
> Same with Worker:
> https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
>
> Both Master and Worker have to start with these two jars because
> a.) the Master has to serve the event log in s3
> b.) the Worker runs the Driver and has to download the jar from s3
>
> And yes I am using these deps:
>
> 
> 
> org.apache.hadoop
> hadoop-aws
> 2.7.1
> 
>
> 
> com.amazonaws
> aws-java-sdk
> 1.7.4
> 
>
> I think I have settled on just modifying the java command line that starts
> up the worker and master. Just seems easier. Currently launching them with
> spark-class bash script
>
> /mnt/services/spark/bin/spark-class org.apache.spark.deploy.master.Master \
> --ip `hostname -i` --port 7077 --webui-port 8080
>
> If all else fails I will update the spark pom and and include it in the
> shaded spark jar.
>
> On Fri, Oct 16, 2015 at 2:25 AM, Steve Loughran 
> wrote:
>
>>
>> > On 15 Oct 2015, at 19:04, Scott Reynolds  wrote:
>> >
>> > List,
>> >
>> > Right now we build our spark jobs with the s3a hadoop client. We do
>> this because our machines are only allowed to use IAM access to the s3
>> store. We can build our jars with the s3a filesystem and the aws sdk just
>> fine and this jars run great in *client mode*.
>> >
>> > We would like to move from client mode to cluster mode as that will
>> allow us to be more resilient to driver failure. In order to do this either:
>> > 1. the jar file has to be on worker's local disk
>> > 2. the jar file is in shared storage (s3a)
>> >
>> > We would like to put the jar file in s3 storage, but when we give the
>> jar path as s3a://.., the worker node doesn't have the hadoop s3a and
>> aws sdk in its classpath / uber jar.
>> >
>> > Other then building spark with those two dependencies, what other
>> options do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a
>> thing.
>> >
>> > Need to get s3a access to both the master (so that we can log spark
>> event log to s3) and to the worker processes (driver, executor).
>> >
>> > Looking for ideas before just adding the dependencies to our spark
>> build and calling it a day.
>>
>>
>> you can use --jars to add these, e.g
>>
>> -jars hadoop-aws.jar,aws-java-sdk-s3
>>
>>
>> as others have warned, you need Hadoop 2.7.1 for s3a to work proplery
>>
>
>


Re: Complex transformation on a dataframe column

2015-10-17 Thread Raghavendra Pandey
Here is a quick code sample I can come up with :

case class Input(ID:String, Name:String, PhoneNumber:String, Address:
String)
val df = sc.parallelize(Seq(Input("1", "raghav", "0123456789",
"houseNo:StreetNo:City:State:Zip"))).toDF()
val formatAddress = udf { (s: String) => s.split(":").mkString("-")}
val outputDF = df.withColumn("FormattedAddress",
formatAddress(df("Address")))


-Raghav

On Thu, Oct 15, 2015 at 10:34 PM, Hao Wang  wrote:

> Hi,
>
> I have searched around but could not find a satisfying answer to this
> question: what is the best way to do a complex transformation on a
> dataframe column?
>
> For example, I have a dataframe with the following schema and a function
> that has pretty complex logic to format addresses. I would like to use the
> function to format each address and store the output as an additional
> column in the dataframe. What is the best way to do it? Use Dataframe.map?
> Define a UDF? Some code example would be appreciated.
>
> Input dataframe:
> root
>  |-- ID: string (nullable = true)
>  |-- Name: string (nullable = true)
>  |-- PhoneNumber: string (nullable = true)
>  |-- Address: string (nullable = true)
>
> Output dataframe:
> root
>  |-- ID: string (nullable = true)
>  |-- Name: string (nullable = true)
>  |-- PhoneNumber: string (nullable = true)
>  |-- Address: string (nullable = true)
>  |-- FormattedAddress: string (nullable = true)
>
> The function for format addresses:
> def formatAddress(address: String): String
>
>
> Best regards,
> Hao Wang
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: repartition vs partitionby

2015-10-17 Thread shahid ashraf
yes i know about that,its in case to reduce partitions. the point here is
the data is skewed to few partitions..


On Sat, Oct 17, 2015 at 6:27 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> You can use coalesce function, if you want to reduce the number of
> partitions. This one minimizes the data shuffle.
>
> -Raghav
>
> On Sat, Oct 17, 2015 at 1:02 PM, shahid qadri 
> wrote:
>
>> Hi folks
>>
>> I need to reparation large set of data around(300G) as i see some
>> portions have large data(data skew)
>>
>> i have pairRDDs [({},{}),({},{}),({},{})]
>>
>> what is the best way to solve the the problem
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
with Regards
Shahid Ashraf


Re: can I use Spark as alternative for gem fire cache ?

2015-10-17 Thread Ndjido Ardo Bar
Hi Kali,

If I do understand you well, Tachyon ( http://tachyon-project.org) can be good 
alternative. You can use Spark Api to load and persist data into Tachyon. 
Hope that will help.

Ardo 

> On 17 Oct 2015, at 15:28, "kali.tumm...@gmail.com"  
> wrote:
> 
> Hi All, 
> 
> Can spark be used as an alternative to gem fire cache ? we use gem fire
> cache to save (cache) dimension data in memory which is later used by our
> Java custom made ETL tool can I do something like below ?
> 
> can I cache a RDD in memory for a whole day ? as of I know RDD will get
> empty once the spark code finish executing (correct me if I am wrong).
> 
> Spark:- 
> create a RDD 
> rdd.persistance 
> 
> Thanks
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/can-I-use-Spark-as-alternative-for-gem-fire-cache-tp25106.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Output println info in LogMessage Info ?

2015-10-17 Thread kali.tumm...@gmail.com
Hi All, 

I n Unix I can print some warning or info using LogMessage WARN "Hi All" or
LogMessage INFO "Hello World" is there similar thing in Spark ?

Imagine I wan to print count of RDD in Logs instead of using Println

Thanks
Sri 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Output-println-info-in-LogMessage-Info-tp25107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming scheduler delay VS driver.cores

2015-10-17 Thread Adrian Tanase
Hi,

I’ve recently bumped up the resources for a spark streaming job – and the 
performance started to degrade over time.
it was running fine on 7 nodes with 14 executor cores each (via Yarn) until I 
bumped executor.cores to 22 cores/node (out of 32 on AWS c3.xlarge, 24 for yarn)

The driver has 2 cores and 2 GB ram (usage is at zero).

For really low data volume it goes from 1-2 seconds per batch to 4-5 s/batch 
after about 6 hours, doing almost nothing. I’ve noticed that the scheduler 
delay is 3-4s, even 5-6 seconds for some tasks. Should be in the low tens of 
milliseconds. What’s weirder is that under moderate load (thousands of events 
per second) - the delay is not as obvious anymore.

After this I reduced the executor.cores to 20 and bumped driver.cores to 4 and 
it seems to be ok now.
However, this is totally empirical, I have not found any documentation, code 
samples or email discussion on how to properly set driver.cores.

Does anyone know:

  *   If I assign more cores to the driver/application manager, will it use 
them?
 *   I was looking at the process list with htop and only one of the jvm’s 
on the driver was really taking up CPU time
  *   What is a decent parallelism factor for a streaming app with 10-20 secs 
batch time? I found it odd that at  7 x 22 = 154 the driver is becoming a 
bottleneck
 *   I’ve seen people recommend 3-4 taks/core or ~1000 parallelism for 
clusters in the tens of nodes

Thanks in advance,
-adrian


Re: repartition vs partitionby

2015-10-17 Thread Adrian Tanase
If the dataset allows it you can try to write a custom partitioner to help 
spark distribute the data more uniformly.

Sent from my iPhone

On 17 Oct 2015, at 16:14, shahid ashraf 
> wrote:

yes i know about that,its in case to reduce partitions. the point here is the 
data is skewed to few partitions..


On Sat, Oct 17, 2015 at 6:27 PM, Raghavendra Pandey 
> wrote:
You can use coalesce function, if you want to reduce the number of partitions. 
This one minimizes the data shuffle.

-Raghav

On Sat, Oct 17, 2015 at 1:02 PM, shahid qadri 
> wrote:
Hi folks

I need to reparation large set of data around(300G) as i see some portions have 
large data(data skew)

i have pairRDDs [({},{}),({},{}),({},{})]

what is the best way to solve the the problem
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org





--
with Regards
Shahid Ashraf


Should I convert json into parquet?

2015-10-17 Thread Gavin Yue
I have json files which contains timestamped events.  Each event associate
with a user id.

Now I want to group by user id. So converts from

Event1 -> UserIDA;
Event2 -> UserIDA;
Event3 -> UserIDB;

To intermediate storage.
UserIDA -> (Event1, Event2...)
UserIDB-> (Event3...)

Then I will label positives and featurize the Events Vector in many
different ways, fit each of them into the Logistic Regression.

I want to save intermediate storage permanently since it will be used many
times.  And there will new events coming every day. So I need to update
this intermediate storage every day.

Right now I store intermediate storage using Json files.  Should I use
Parquet instead?  Or is there better solutions for this use case?

Thanks a lot !


Re: Problem installing Sparck on Windows 8

2015-10-17 Thread Marco Mistroni
HI
 still having issues in installing spark on windows 8
the spark web console runs successfully.. i can run spark pi example,
however wheni run spark-shell i am getting the following exception

java.lang.RuntimeException: java.lang.RuntimeException: The root scratch
dir: /t
mp/hive on HDFS should be writable. Current permissions are: -
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav
a:522)
at
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.s
cala:171)
at
org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveCo
ntext.scala:162)
at
org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala
:160)
at
org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:167)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

i have amended the permisisons to full access for my account on windows
8... i am trying to understand if this exception can be ablocker if i
decide to submt tasks to spark
given the fact that i can run spark example, this does no tlook like a
blocker but  seeing exceptions when i launch the spark shell does not make
me feel comfortable, expecially if i dont understand why i am having
exception
doesn't spark like windows 8?

any suggestions appreciated

kind regards
 marco



On Thu, Oct 15, 2015 at 11:40 PM, Marco Mistroni 
wrote:

> Hi
>  i t ried to set this variable in my windows env variables but got same
> result
> this si the result of calling set in my command prompt
> have i amended it in the wrong place?
>
> kr
>  marco
> ..
> USERDOMAIN=MarcoLaptop
> USERDOMAIN_ROAMINGPROFILE=MarcoLaptop
> USERNAME=marco
> USERPROFILE=C:\Users\marco
> windir=C:\Windows
> _JAVA_OPTIONS=-Djava.net.preferIPv4Stack=true
>
>
> On Thu, Oct 15, 2015 at 1:25 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> Looks like you are facing ipv6 issue. Can you try using preferIPv4
>> property on.
>> On Oct 15, 2015 2:10 AM, "Steve Loughran"  wrote:
>>
>>>
>>> On 14 Oct 2015, at 20:56, Marco Mistroni  wrote:
>>>
>>>
>>> 15/10/14 20:52:35 WARN : Your hostname, MarcoLaptop resolves to a
>>> loopback/non-r
>>> eachable address: fe80:0:0:0:c5ed:a66d:9d95:5caa%wlan2, but we couldn't
>>> find any
>>>  external IP address!
>>> java.lang.RuntimeException: java.lang.RuntimeException: The root scratch
>>> dir: /t
>>> mp/hive on HDFS should be writable. Current permissions are: -
>>> at
>>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav
>>> a:522)
>>> at
>>> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.s
>>> cala:171)
>>> at
>>> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveCo
>>>
>>>
>>> now, that I haven't seen. Looks like it thinks the permissions are
>>> wrong, doesn't it?
>>>
>>
>