Re: Do jobs fail because of other users of a cluster?

2017-01-23 Thread Matthew Dailey
In general, Java processes fail with an OutOfMemoryError when your code and
data does not fit into the memory allocated to the runtime.  In Spark, that
memory is controlled through the --executor-memory flag.
If you are running Spark on YARN, then YARN configuration will dictate the
maximum memory that your Spark executors can request.  Here is a pretty
good article about setting memory in Spark on YARN:
http://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html

If the OS were to kill your process because the system has run out of
memory, you would see an error printed to standard error that looks like
this:

Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0xe232, 37601280, 0) failed;
error='Cannot allocate memory' (errno=12)
# There is insufficient memory for the Java Runtime Environment to continue.



On Wed, Jan 18, 2017 at 10:25 AM, David Frese 
wrote:

> Hello everybody,
>
> being quite new to Spark, I am struggling a lot with OutOfMemory exceptions
> and "GC overhead limit reached" failures of my jobs, submitted from a
> spark-shell and "master yarn".
>
> Playing with --num-executors, --executor-memory and --executor-cores I
> occasionally get something done. But I'm also not the only one using the
> cluster, and it seems to me, that my jobs sometimes fail with the above
> errors, because other people have something running, or have a spark-shell
> open at that time; or at least it seems that with the same code, data and
> settings, the job sometimes completes and sometimes fails.
>
> Is that "expected behaviour"?
>
> What options/tools can be used to make the success/failure of a job
> deterministic - there a lot things out there like, 'dynamic allocation',
> Hadoop 'fair scheduler'; but very hard for a newbee to evaluate that (resp.
> make suggestions to the admins).
>
> If it cannot be made deterministic, how can I reliably distinguish the OOM
> failures that are caused by incorrect settings on my side (e.g. because my
> data does not fit into memory), and those failures that are caused by
> resource consumption/blocking from other jobs?
>
> Thanks for sharing your thoughts and experiences!
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Do-jobs-fail-because-of-other-users-
> of-a-cluster-tp28318.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: tuning the spark.locality.wait

2017-01-23 Thread Matthew Dailey
This article recommends setting spark.locality.wait to 10 (milliseconds) in
the case of using Spark Streaming and gives an explanation of why they
chose that value.  If using batch Spark, that value should still be a good
starting place
https://www.inovex.de/blog/247-spark-streaming-on-yarn-in-production/



On Sat, Jan 21, 2017 at 5:06 AM, Cesar  wrote:

>
> I am working with datasets of the order of 200 GB using 286 cores divided
> across 143 executor. Each executor has 32 Gb (which makes every core 15
> Gb). And I am using Spark 1.6.
>
>
> I would like to tune the spark.locality.wait. Does anyone can give me a
> range on the values of spark.locality wait that I can try ?
>
>
>
> Thanks a lot !
> --
> Cesar Flores
>


Re: Spark Metrics: custom source/sink configurations not getting recognized

2016-11-28 Thread Matthew Dailey
I just stumbled upon this issue as well in Spark 1.6.2 when trying to write
my own custom Sink.  For anyone else who runs into this issue, there are
two relevant JIRAs that I found, but no solution as of yet:
- https://issues.apache.org/jira/browse/SPARK-14151 - Propose to refactor
and expose Metrics Sink and Source interface
- https://issues.apache.org/jira/browse/SPARK-18115 - Custom metrics
Sink/Source prevent Executor from starting

On Thu, Sep 8, 2016 at 3:23 PM, map reduced  wrote:

> Can this be listed as an issue on JIRA?
>
> On Wed, Sep 7, 2016 at 10:19 AM, map reduced  wrote:
>
>> Thanks for the reply, I wish it did. We have an internal metrics system
>> where we need to submit to. I am sure that the ways I've tried work with
>> yarn deployment, but not with standalone.
>>
>> Thanks,
>> KP
>>
>> On Tue, Sep 6, 2016 at 11:36 PM, Benjamin Kim  wrote:
>>
>>> We use Graphite/Grafana for custom metrics. We found Spark’s metrics not
>>> to be customizable. So, we write directly using Graphite’s API, which was
>>> very easy to do using Java’s socket library in Scala. It works great for
>>> us, and we are going one step further using Sensu to alert us if there is
>>> an anomaly in the metrics beyond the norm.
>>>
>>> Hope this helps.
>>>
>>> Cheers,
>>> Ben
>>>
>>>
>>> On Sep 6, 2016, at 9:52 PM, map reduced  wrote:
>>>
>>> Hi, anyone has any ideas please?
>>>
>>> On Mon, Sep 5, 2016 at 8:30 PM, map reduced  wrote:
>>>
 Hi,

 I've written my custom metrics source/sink for my Spark streaming app
 and I am trying to initialize it from metrics.properties - but that doesn't
 work from executors. I don't have control on the machines in Spark cluster,
 so I can't copy properties file in $SPARK_HOME/conf/ in the cluster. I have
 it in the fat jar where my app lives, but by the time my fat jar is
 downloaded on worker nodes in cluster, executors are already started and
 their Metrics system is already initialized - thus not picking my file with
 custom source configuration in it.

 Following this post
 ,
 I've specified 'spark.files
  =
 metrics.properties' and 'spark.metrics.conf=metrics.properties' but by
 the time 'metrics.properties' is shipped to executors, their metric system
 is already initialized.

 If I initialize my own metrics system, it's picking up my file but then
 I'm missing master/executor level metrics/properties (eg.
 executor.sink.mySink.propName=myProp - can't read 'propName' from
 'mySink') since they are initialized
 
  by
 Spark's metric system.

 Is there a (programmatic) way to have 'metrics.properties' shipped
 before executors initialize
 
  ?

 Here's my SO question
 
 .

 Thanks,

 KP

>>>
>>>
>>>
>>
>


Are Task Closures guaranteed to be accessed by only one Thread?

2016-10-05 Thread Matthew Dailey
Looking at the programming guide
<http://spark.apache.org/docs/1.6.1/programming-guide.html#local-vs-cluster-modes>
for Spark 1.6.1, it states
> Prior to execution, Spark computes the task’s closure. The closure is
those variables and methods which must be visible for the executor to
perform its computations on the RDD
> The variables within the closure sent to each executor are now copies

So my question is, will an executor access a single copy of the closure
with more than one thread?  I ask because I want to know if I can ignore
thread-safety in a function I write.  Take a look at this gist as a
simplified example with a thread-unsafe operation being passed to map():
https://gist.github.com/matthew-dailey/4e1ab0aac580151dcfd7fbe6beab84dc

This is for Spark Streaming, but I suspect the answer is the same between
batch and streaming.

Thanks for any help,
Matt