Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-27 Thread Rohit Karlupia
Let me be more specific:

With GC/CPU aware task scheduling, user doesn't have to worry about
specifying cores carefully. So if the user always specify cores = 100 or
1024 for every executor, he will still not get OOM  (under vast majority of
cases). Internally, the scheduler will vary the number of tasks assigned to
executors ensuring that executor doesn't runs into GC cycles or causes
useless context switches.  In short, as long as users configure cores per
executors on the higher side, it will be harmless in general and can
actually help in increasing the throughput of the system by utilising
unused memory or CPU capacity available for use.

*For example:* lets say we are using 64 GB machine with 8 cores. Lets say
we are using one big 54GB executors with 8 cores. This results in on
average 7GB of memory per task. It is possible that some tasks take more
than 7GB and some takes less than 7GB. Consider a case when one task takes
34GB of memory. Very likely such a stage will fail depending upon if the
rest 7 tasks scheduled at the same time need more than 20GB of memory  (54
- 34). The usual approach to solving this problem without changing the
application would be to sacrifice cores and increase memory per core. The
stable configuration in this case could be 2 cores for 54GB executor, which
will result in wasting of 6 cores "throughout" the application.

With GC/CPU aware task scheduling one can configure the same executors with
say 64 cores and the application is very likely to succeed. Being aware of
GC, the scheduler will stop scheduling tasks on the executor, making it
possible for the running task to consume all 54GB of memory. This ensures
that we only "sacrifice" cores, when necessary and not in general and not
for the whole duration of the application.  On the other hand, if the
scheduler finds out that inspite of running 8 concurrent tasks, we still
have memory and cpu to spare, it will schedule more tasks upto 64, as
configured. So we not only get stability against skew but we also get
higher throughput when possible.

Hope that helps.

thanks,
rohitk












On Tue, Mar 27, 2018 at 9:20 AM, Fawze Abujaber <fawz...@gmail.com> wrote:

> Thanks for the update.
>
> What about cores per executor?
>
> On Tue, 27 Mar 2018 at 6:45 Rohit Karlupia <roh...@qubole.com> wrote:
>
>> Thanks Fawze!
>>
>> On the memory front, I am currently working on GC and CPU aware task
>> scheduling. I see wonderful results based on my tests so far.  Once the
>> feature is complete and available, spark will work with whatever memory is
>> provided (at least enough for the largest possible task). It will also
>> allow you to run say 64 concurrent tasks on 8 core machine, if the nature
>> of tasks doesn't leads to memory or CPU contention. Essentially why worry
>> about tuning memory when you can let spark take care of it automatically
>> based on memory pressure. Will post details when we are ready.  So yes we
>> are working on memory, but it will not be a tool but a transparent feature.
>>
>> thanks,
>> rohitk
>>
>>
>>
>>
>> On Tue, Mar 27, 2018 at 7:53 AM, Fawze Abujaber <fawz...@gmail.com>
>> wrote:
>>
>>> Hi Rohit,
>>>
>>> I would like to thank you for the unlimited patience and support that
>>> you are providing here and behind the scene for all of us.
>>>
>>> The tool is amazing and easy to use and understand most of the metrics
>>> ...
>>>
>>> Thinking if we need to run it in cluster mode and all the time, i think
>>> we can skip it as one or few runs can give you the large picture of how the
>>> job is running with different configuration and it's not too much
>>> complicated to run it using spark-submit.
>>>
>>> I think it will be so helpful if the sparklens can also include how the
>>> job is running with different configuration of cores and memory, Spark job
>>> with 1 exec and 1 core will run different from spark job with 1  exec and 3
>>> cores and for sure the same compare with different exec memory.
>>>
>>> Overall, it is so good starting point, but it will be a GAME CHANGER
>>> getting these metrics on the tool.
>>>
>>> @Rohit , Huge THANY YOU
>>>
>>> On Mon, Mar 26, 2018 at 1:35 PM, Rohit Karlupia <roh...@qubole.com>
>>> wrote:
>>>
>>>> Hi Shmuel,
>>>>
>>>> In general it is hard to pin point to exact code which is responsible
>>>> for a specific stage. For example when using spark sql, depending upon the
>>>> kind of joins, aggregations used in the the single line of query, we will
>>>> have multiple stages in 

Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-26 Thread Rohit Karlupia
Thanks Fawze!

On the memory front, I am currently working on GC and CPU aware task
scheduling. I see wonderful results based on my tests so far.  Once the
feature is complete and available, spark will work with whatever memory is
provided (at least enough for the largest possible task). It will also
allow you to run say 64 concurrent tasks on 8 core machine, if the nature
of tasks doesn't leads to memory or CPU contention. Essentially why worry
about tuning memory when you can let spark take care of it automatically
based on memory pressure. Will post details when we are ready.  So yes we
are working on memory, but it will not be a tool but a transparent feature.

thanks,
rohitk




On Tue, Mar 27, 2018 at 7:53 AM, Fawze Abujaber <fawz...@gmail.com> wrote:

> Hi Rohit,
>
> I would like to thank you for the unlimited patience and support that you
> are providing here and behind the scene for all of us.
>
> The tool is amazing and easy to use and understand most of the metrics ...
>
> Thinking if we need to run it in cluster mode and all the time, i think we
> can skip it as one or few runs can give you the large picture of how the
> job is running with different configuration and it's not too much
> complicated to run it using spark-submit.
>
> I think it will be so helpful if the sparklens can also include how the
> job is running with different configuration of cores and memory, Spark job
> with 1 exec and 1 core will run different from spark job with 1  exec and 3
> cores and for sure the same compare with different exec memory.
>
> Overall, it is so good starting point, but it will be a GAME CHANGER
> getting these metrics on the tool.
>
> @Rohit , Huge THANY YOU
>
> On Mon, Mar 26, 2018 at 1:35 PM, Rohit Karlupia <roh...@qubole.com> wrote:
>
>> Hi Shmuel,
>>
>> In general it is hard to pin point to exact code which is responsible for
>> a specific stage. For example when using spark sql, depending upon the kind
>> of joins, aggregations used in the the single line of query, we will have
>> multiple stages in the spark application. I usually try to split the code
>> into smaller chunks and also use the spark UI which has special section for
>> SQL. It can also show specific backtraces, but as I explained earlier they
>> might not be very helpful. Sparklens does help you ask the right questions,
>> but is not mature enough to answer all of them.
>>
>> Understanding the report:
>>
>> *1) The first part of total aggregate metrics for the application.*
>>
>> Printing application meterics.
>>
>>  AggregateMetrics (Application Metrics) total measurements 1869
>> NAMESUMMIN   
>> MAXMEAN
>>  diskBytesSpilled0.0 KB 0.0 KB 
>> 0.0 KB  0.0 KB
>>  executorRuntime15.1 hh 3.0 ms 
>> 4.0 mm 29.1 ss
>>  inputBytesRead 26.1 GB 0.0 KB
>> 43.8 MB 14.3 MB
>>  jvmGCTime  11.0 mm 0.0 ms 
>> 2.1 ss354.0 ms
>>  memoryBytesSpilled314.2 GB 0.0 KB 
>> 1.1 GB172.1 MB
>>  outputBytesWritten  0.0 KB 0.0 KB 
>> 0.0 KB  0.0 KB
>>  peakExecutionMemory 0.0 KB 0.0 KB 
>> 0.0 KB  0.0 KB
>>  resultSize 12.9 MB 2.0 KB
>> 40.9 KB  7.1 KB
>>  shuffleReadBytesRead  107.7 GB 0.0 KB   
>> 276.0 MB 59.0 MB
>>  shuffleReadFetchWaitTime2.0 ms 0.0 ms 
>> 0.0 ms  0.0 ms
>>  shuffleReadLocalBlocks   2,318  0   
>>   68   1
>>  shuffleReadRecordsRead   3,413,511,099  0  
>> 8,251,926   1,826,383
>>  shuffleReadRemoteBlocks291,126  0   
>>  824 155
>>  shuffleWriteBytesWritten  107.6 GB 0.0 KB   
>> 257.6 MB 58.9 MB
>>  shuffleWriteRecordsWritten   3,408,133,175  0  
>> 7,959,055   1,823,506
>>  shuffleWriteTime8.7 mm 0.0 ms 
>> 1.8 ss278.2 ms
>>  taskDuration   15.4 hh12.0 ms 
>> 4.1 mm 29.7 ss
>>
>>

Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-26 Thread Rohit Karlupia
C:Amount of time spent in GC across all tasks in the
given stage as a
   percentage

If the stage contributes large percentage to overall application time,
we could look into
these metrics to check which part (Shuffle write, read fetch or GC is
responsible)

thanks,

rohitk



On Mon, Mar 26, 2018 at 1:38 AM, Shmuel Blitz <shmuel.bl...@similarweb.com>
wrote:

> Hi Rohit,
>
> Thanks for the analysis.
>
> I can use repartition on the slow task. But how can I tell what part of
> the code is in charge of the slow tasks?
>
> It would be great if you could further explain the rest of the output.
>
> Thanks in advance,
> Shmuel
>
> On Sun, Mar 25, 2018 at 12:46 PM, Rohit Karlupia <roh...@qubole.com>
> wrote:
>
>> Thanks Shamuel for trying out sparklens!
>>
>> Couple of things that I noticed:
>> 1) 250 executors is probably overkill for this job. It would run in same
>> time with around 100.
>> 2) Many of stages that take long time have only 200 tasks where as we
>> have 750 cores available for the job. 200 is the default value for
>> spark.sql.shuffle.partitions.  Alternatively you could try increasing
>> the value of spark.sql.shuffle.partitions to latest 750.
>>
>> thanks,
>> rohitk
>>
>> On Sun, Mar 25, 2018 at 1:25 PM, Shmuel Blitz <
>> shmuel.bl...@similarweb.com> wrote:
>>
>>> I ran it on a single job.
>>> SparkLens has an overhead on the job duration. I'm not ready to enable
>>> it by default on all our jobs.
>>>
>>> Attached is the output.
>>>
>>> Still trying to understand what exactly it means.
>>>
>>> On Sun, Mar 25, 2018 at 10:40 AM, Fawze Abujaber <fawz...@gmail.com>
>>> wrote:
>>>
>>>> Nice!
>>>>
>>>> Shmuel, Were you able to run on a cluster level or for a specific job?
>>>>
>>>> Did you configure it on the spark-default.conf?
>>>>
>>>> On Sun, 25 Mar 2018 at 10:34 Shmuel Blitz <shmuel.bl...@similarweb.com>
>>>> wrote:
>>>>
>>>>> Just to let you know, I have managed to run SparkLens on our cluster.
>>>>>
>>>>> I switched to the spark_1.6 branch, and also compiled against the
>>>>> specific image of Spark we are using (cdh5.7.6).
>>>>>
>>>>> Now I need to figure out what the output means... :P
>>>>>
>>>>> Shmuel
>>>>>
>>>>> On Fri, Mar 23, 2018 at 7:24 PM, Fawze Abujaber <fawz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Quick question:
>>>>>>
>>>>>> how to add the  --jars /path/to/sparklens_2.11-0.1.0.jar to the
>>>>>> spark-default conf, should it be using:
>>>>>>
>>>>>> spark.driver.extraClassPath /path/to/sparklens_2.11-0.1.0.jar or i
>>>>>> should use spark.jars option? anyone who could give an example how it
>>>>>> should be, and if i the path for the jar should be an hdfs path as i'm
>>>>>> using it in cluster mode.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 23, 2018 at 6:33 AM, Fawze Abujaber <fawz...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Shmuel,
>>>>>>>
>>>>>>> Did you compile the code against the right branch for Spark 1.6.
>>>>>>>
>>>>>>> I tested it and it looks working and now i'm testing the branch for
>>>>>>> a wide tests, Please use the branch for Spark 1.6
>>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 12:43 AM, Shmuel Blitz <
>>>>>>> shmuel.bl...@similarweb.com> wrote:
>>>>>>>
>>>>>>>> Hi Rohit,
>>>>>>>>
>>>>>>>> Thanks for sharing this great tool.
>>>>>>>> I tried running a spark job with the tool, but it failed with an 
>>>>>>>> *IncompatibleClassChangeError
>>>>>>>> *Exception.
>>>>>>>>
>>>>>>>> I have opened an issue on Github.(https://github.com/qub
>>>>>>>> ole/sparklens/issues/1)
>>>>>>>>
>>>>>>>> Shmuel
>>>>>>>>
>>>>>>>> On Thu, Mar 22, 2018 at 5:05 PM, Shmuel Blitz <
>>>>>

Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-25 Thread Rohit Karlupia
Thanks Shamuel for trying out sparklens!

Couple of things that I noticed:
1) 250 executors is probably overkill for this job. It would run in same
time with around 100.
2) Many of stages that take long time have only 200 tasks where as we have
750 cores available for the job. 200 is the default value for
spark.sql.shuffle.partitions.  Alternatively you could try increasing the
value of spark.sql.shuffle.partitions to latest 750.

thanks,
rohitk

On Sun, Mar 25, 2018 at 1:25 PM, Shmuel Blitz <shmuel.bl...@similarweb.com>
wrote:

> I ran it on a single job.
> SparkLens has an overhead on the job duration. I'm not ready to enable it
> by default on all our jobs.
>
> Attached is the output.
>
> Still trying to understand what exactly it means.
>
> On Sun, Mar 25, 2018 at 10:40 AM, Fawze Abujaber <fawz...@gmail.com>
> wrote:
>
>> Nice!
>>
>> Shmuel, Were you able to run on a cluster level or for a specific job?
>>
>> Did you configure it on the spark-default.conf?
>>
>> On Sun, 25 Mar 2018 at 10:34 Shmuel Blitz <shmuel.bl...@similarweb.com>
>> wrote:
>>
>>> Just to let you know, I have managed to run SparkLens on our cluster.
>>>
>>> I switched to the spark_1.6 branch, and also compiled against the
>>> specific image of Spark we are using (cdh5.7.6).
>>>
>>> Now I need to figure out what the output means... :P
>>>
>>> Shmuel
>>>
>>> On Fri, Mar 23, 2018 at 7:24 PM, Fawze Abujaber <fawz...@gmail.com>
>>> wrote:
>>>
>>>> Quick question:
>>>>
>>>> how to add the  --jars /path/to/sparklens_2.11-0.1.0.jar to the
>>>> spark-default conf, should it be using:
>>>>
>>>> spark.driver.extraClassPath /path/to/sparklens_2.11-0.1.0.jar or i
>>>> should use spark.jars option? anyone who could give an example how it
>>>> should be, and if i the path for the jar should be an hdfs path as i'm
>>>> using it in cluster mode.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Mar 23, 2018 at 6:33 AM, Fawze Abujaber <fawz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Shmuel,
>>>>>
>>>>> Did you compile the code against the right branch for Spark 1.6.
>>>>>
>>>>> I tested it and it looks working and now i'm testing the branch for a
>>>>> wide tests, Please use the branch for Spark 1.6
>>>>>
>>>>> On Fri, Mar 23, 2018 at 12:43 AM, Shmuel Blitz <
>>>>> shmuel.bl...@similarweb.com> wrote:
>>>>>
>>>>>> Hi Rohit,
>>>>>>
>>>>>> Thanks for sharing this great tool.
>>>>>> I tried running a spark job with the tool, but it failed with an 
>>>>>> *IncompatibleClassChangeError
>>>>>> *Exception.
>>>>>>
>>>>>> I have opened an issue on Github.(https://github.com/qub
>>>>>> ole/sparklens/issues/1)
>>>>>>
>>>>>> Shmuel
>>>>>>
>>>>>> On Thu, Mar 22, 2018 at 5:05 PM, Shmuel Blitz <
>>>>>> shmuel.bl...@similarweb.com> wrote:
>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> We will give this a try and report back.
>>>>>>>
>>>>>>> Shmuel
>>>>>>>
>>>>>>> On Thu, Mar 22, 2018 at 4:22 PM, Rohit Karlupia <roh...@qubole.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks everyone!
>>>>>>>> Please share how it works and how it doesn't. Both help.
>>>>>>>>
>>>>>>>> Fawaze, just made few changes to make this work with spark 1.6. Can
>>>>>>>> you please try building from branch *spark_1.6*
>>>>>>>>
>>>>>>>> thanks,
>>>>>>>> rohitk
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 22, 2018 at 10:18 AM, Fawze Abujaber <fawz...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> It's super amazing  i see it was tested on spark 2.0.0 and
>>>>>>>>> above, what about Spark 1.6 which is still part of Cloudera's main 
>>>>>&g

Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-22 Thread Rohit Karlupia
Thanks everyone!
Please share how it works and how it doesn't. Both help.

Fawaze, just made few changes to make this work with spark 1.6. Can you
please try building from branch *spark_1.6*

thanks,
rohitk



On Thu, Mar 22, 2018 at 10:18 AM, Fawze Abujaber <fawz...@gmail.com> wrote:

> It's super amazing  i see it was tested on spark 2.0.0 and above, what
> about Spark 1.6 which is still part of Cloudera's main versions?
>
> We have a vast Spark applications with version 1.6.0
>
> On Thu, Mar 22, 2018 at 6:38 AM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> Super exciting! I look forward to digging through it this weekend.
>>
>> On Wed, Mar 21, 2018 at 9:33 PM ☼ R Nair (रविशंकर नायर) <
>> ravishankar.n...@gmail.com> wrote:
>>
>>> Excellent. You filled a missing link.
>>>
>>> Best,
>>> Passion
>>>
>>> On Wed, Mar 21, 2018 at 11:36 PM, Rohit Karlupia <roh...@qubole.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Happy to announce the availability of Sparklens as open source project.
>>>> It helps in understanding the  scalability limits of spark applications and
>>>> can be a useful guide on the path towards tuning applications for lower
>>>> runtime or cost.
>>>>
>>>> Please clone from here: https://github.com/qubole/sparklens
>>>> Old blogpost: https://www.qubole.com/blog/introducing-quboles-sp
>>>> ark-tuning-tool/
>>>>
>>>> thanks,
>>>> rohitk
>>>>
>>>> PS: Thanks for the patience. It took couple of months to get back on
>>>> this.
>>>>
>>>>
>>>>
>>>>
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-21 Thread Rohit Karlupia
Hi,

Happy to announce the availability of Sparklens as open source project. It
helps in understanding the  scalability limits of spark applications and
can be a useful guide on the path towards tuning applications for lower
runtime or cost.

Please clone from here: https://github.com/qubole/sparklens
Old blogpost: https://www.qubole.com/blog/introducing-quboles-spark-
tuning-tool/

thanks,
rohitk

PS: Thanks for the patience. It took couple of months to get back on this.


Spark Tuning Tool

2018-01-22 Thread Rohit Karlupia
Hi,

I have been working on making the performance tuning of spark applications
bit easier.  We have just released the beta version of the tool on Qubole.

https://www.qubole.com/blog/introducing-quboles-spark-tuning-tool/

This is not OSS yet but we would like to contribute it to OSS.  Fishing for
some interest in the community if people find this work interesting and
would like to try to it out.

thanks,
Rohit Karlupia


Re: Spark on EMR suddenly stalling

2018-01-01 Thread Rohit Karlupia
Here is the list that I will probably try to fill:

   1. Check GC on the offending executor when the task is running. May be
   you need even more memory.
   2. Go back to some previous successful run of the job and check the
   spark ui for the offending stage and check max task time/max input/max
   shuffle in/out for the largest task. Will help you understand the degree of
   skew in this stage.
   3. Take a thread dump of the executor from the Spark UI and verify if
   the task is really doing any work or it stuck in some deadlock. Some of the
   hive serde are not really usable from multi-threaded/multi-use spark
   executors.
   4. Take a thread dump of the executor from the Spark UI and verify if
   the task is spilling to disk. Playing with storage and memory fraction or
   generally increasing the memory will help.
   5. Check the disk utilisation on the machine running the executor.
   6. Look for event loss messages in the logs due to event queue full.
   Loss of events can send some of the spark components into really bad
   states.


thanks,
rohitk



On Sun, Dec 31, 2017 at 12:50 AM, Gourav Sengupta  wrote:

> Hi,
>
> Please try to use the SPARK UI from the way that AWS EMR recommends, it
> should be available from the resource manager. I never ever had any problem
> working with it. THAT HAS ALWAYS BEEN MY PRIMARY AND SOLE SOURCE OF
> DEBUGGING.
>
> Sadly, I cannot be of much help unless we go for a screen share session
> over google chat or skype.
>
> Also, I ALWAYS prefer the maximize Resource Allocation setting in EMR to
> be set to true.
>
> Besides that, there is a metrics in the EMR console which shows the number
> of containers getting generated by your job on graphs.
>
>
>
> Regards,
> Gourav Sengupta
>
> On Fri, Dec 29, 2017 at 6:23 PM, Jeroen Miller 
> wrote:
>
>> Hello,
>>
>> Just a quick update as I did not made much progress yet.
>>
>> On 28 Dec 2017, at 21:09, Gourav Sengupta 
>> wrote:
>> > can you try to then use the EMR version 5.10 instead or EMR version
>> 5.11 instead?
>>
>> Same issue with EMR 5.11.0. Task 0 in one stage never finishes.
>>
>> > can you please try selecting a subnet which is in a different
>> availability zone?
>>
>> I did not try this yet. But why should that make a difference?
>>
>> > if possible just try to increase the number of task instances and see
>> the difference?
>>
>> I tried with 512 partitions -- no difference.
>>
>> > also in case you are using caching,
>>
>> No caching used.
>>
>> > Also can you please report the number of containers that your job is
>> creating by looking at the metrics in the EMR console?
>>
>> 8 containers if I trust the directories in j-xxx/containers/application_x
>> xx/.
>>
>> > Also if you see the spark UI then you can easily see which particular
>> step is taking the longest period of time - you just have to drill in a bit
>> in order to see that. Generally in case shuffling is an issue then it
>> definitely appears in the SPARK UI as I drill into the steps and see which
>> particular one is taking the longest.
>>
>> I always have issues with the Spark UI on EC2 -- it never seems to be up
>> to date.
>>
>> JM
>>
>>
>


Re: org.apache.hadoop.fs.FileSystem: Provider tachyon.hadoop.TFS could not be instantiated

2017-05-07 Thread Rohit Karlupia
Last time I checked, this happens only with Spark < 2.0.0. The reason
is ServiceLoader
used for loading all fileSystems from the classpath. In pre Spark < 2.0.0
tachyon.hadoop.TFS was packaged with Spark distribution and gets loaded
irrespective of it being used or not.  Moving to Spark 2.0.0+ will be the
simplest way out.

The NoClassDefFoundError is sort of distraction here because the real
reason is not able to instantiate this class. This happens because of calls
to find local ip. Check if DNS is working properly. This error is usually
not easily reproducible.

thanks,
rohitk


On Fri, May 5, 2017 at 2:23 PM, Jone Zhang  wrote:

> *When i use sparksql, the error as follows*
>
> 17/05/05 15:58:44 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 20.0 
> (TID 4080, 10.196.143.233): java.util.ServiceConfigurationError: 
> org.apache.hadoop.fs.FileSystem: Provider tachyon.hadoop.TFS could not be 
> instantiated
>   at java.util.ServiceLoader.fail(ServiceLoader.java:224)
>   at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
>   at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
>   at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
>   at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2558)
>   at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2569)
>   at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2586)
>   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:365)
>   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167)
>   at 
> org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:654)
>   at 
> org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:436)
>   at 
> org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:321)
>   at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$12.apply(TableReader.scala:276)
>   at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$12.apply(TableReader.scala:276)
>   at 
> org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
>   at 
> org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
>   at scala.Option.map(Option.scala:145)
>   at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:176)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:212)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
> tachyon.hadoop.TFS
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> 

Re: Setting Optimal Number of Spark Executor Instances

2017-03-15 Thread Rohit Karlupia
Number of tasks is very likely not the reason for getting timeouts. Few
things to look for:

What is actually timing out? What kind of operation?
Writing/Reading to HSDF (NameNode or DataNode)
or fetching shuffle data (External Shuffle Service or not)
or driver is not able to talk to executor.

Trivial of things to do is to increase network timeouts in spark conf.
Other thing to check is: if GC is kicking in -- try increasing heap size.

thanks & all the best,
rohitk



On Thu, Mar 16, 2017 at 7:23 AM, Yong Zhang  wrote:

> Not really sure what is the root problem you try to address.
>
>
> The number of tasks need to be run in Spark depends on the number of
> partitions in your job.
>
>
> Let's use a simple word count example, if your spark job read 128G data
> from HDFS (assume the default block size is 128M), then the mapper stage of
> your spark job will spawn 1000 tasks (128G / 128M).
>
>
> In the reducer stage, by default, spark will spawn 200 tasks (controlled
> by spark.default.parallelism if you are using RDD api or
> spark.sql.shuffle.partitions if you are using DataFrame, and you didn't
> specify the partition number in any of your API call).
>
>
> In either case, you can change the tasks number spawned (Even in the
> mapper case, but I didn't see any reason under normal case). For huge
> datasets running in Spark, people often to increase the tasks count spawned
> in the reducing stage, to make each task processing much less volume of
> data, to reduce the memory pressure and increase performance.
>
>
> Still in the word count example, if you have 2000 unique words in your
> dataset, then your reducer count could be from 1 to 2000. 1 is the worst,
> as only one task will process all 2000 unique words, meaning all the data
> will be sent to this one task, and it will be the slowest. But on the other
> hand, 2000 maybe is neither the best.
>
>
> Let's say we set 200 is the best number, so you will have 200 reduce tasks
> to process 2000 unique words. Setting the number of executors and cores is
> just to allocation how many these tasks can be run concurrently. So if your
> cluster has enough cores and memory available, obviously grant as many as
> cores up to 200 to your spark job for this reducing stage is the best.
>
>
> You need to be more clear about what problem you are facing when running
> your spark job here, so we can provide help. Reducing the number of tasks
> spawned normally is a very strange way.
>
>
> Yong
>
>
> --
> *From:* Kevin Peng 
> *Sent:* Wednesday, March 15, 2017 1:35 PM
> *To:* mohini kalamkar
> *Cc:* user@spark.apache.org
> *Subject:* Re: Setting Optimal Number of Spark Executor Instances
>
> Mohini,
>
> We set that parameter before we went and played with the number of
> executors and that didn't seem to help at all.
>
> Thanks,
>
> KP
>
> On Tue, Mar 14, 2017 at 3:37 PM, mohini kalamkar <
> mohini.kalam...@gmail.com> wrote:
>
>> Hi,
>>
>> try using this parameter --conf spark.sql.shuffle.partitions=1000
>>
>> Thanks,
>> Mohini
>>
>> On Tue, Mar 14, 2017 at 3:30 PM, kpeng1  wrote:
>>
>>> Hi All,
>>>
>>> I am currently on Spark 1.6 and I was doing a sql join on two tables that
>>> are over 100 million rows each and I noticed that it was spawn 3+
>>> tasks
>>> (this is the progress meter that we are seeing show up).  We tried to
>>> coalesece, repartition and shuffle partitions to drop the number of tasks
>>> down because we were getting time outs due to the number of task being
>>> spawned, but those operations did not seem to reduce the number of tasks.
>>> The solution we came up with was actually to set the num executors to 50
>>> (--num-executors=50) and it looks like it spawned 200 active tasks, but
>>> the
>>> total number of tasks remained the same.  Was wondering if anyone knows
>>> what
>>> is going on?  Is there an optimal number of executors, I was under the
>>> impression that the default dynamic allocation would pick the optimal
>>> number
>>> of executors for us and that this situation wouldn't happen.  Is there
>>> something I am missing?
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Setting-Optimal-Number-of-Spark-Execut
>>> or-Instances-tp28493.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Mohini Kalamkar
>> M: +1 310 567 9329 <(310)%20567-9329>
>>
>
>


Re: spark sql jobs heap memory

2016-11-24 Thread Rohit Karlupia
Dataset/dataframes will use direct/raw/off-heap memory in the most
efficient columnar fashion. Trying to fit the same amount of data in heap
memory would likely increase your memory requirement and decrease the
speed.

So, in short, don't worry about it and increase overhead. You can also set
a bound on off heap memory via some option.

thanks,
rohitk

On Thu, Nov 24, 2016 at 12:23 AM, Koert Kuipers  wrote:

> we are testing Dataset/Dataframe jobs instead of RDD jobs. one thing we
> keep running into is containers getting killed by yarn. i realize this has
> to do with off-heap memory, and the suggestion is to increase
> spark.yarn.executor.memoryOverhead.
>
> at times our memoryOverhead is as large as the executor memory (say 4G and
> 4G).
>
> why is Dataset/Dataframe using so much off heap memory?
>
> we havent changed spark.memory.offHeap.enabled which defaults to false.
> should we enable that to get a better handle on this?
>