Re: A proposal for Spark 2.0

2015-11-23 Thread Reynold Xin
I actually think the next one (after 1.6) should be Spark 2.0. The reason
is that I already know we have to break some part of the DataFrame/Dataset
API as part of the Dataset design. (e.g. DataFrame.map should return
Dataset rather than RDD). In that case, I'd rather break this sooner (in
one release) than later (in two releases). so the damage is smaller.

I don't think whether we call Dataset/DataFrame experimental or not matters
too much for 2.0. We can still call Dataset experimental in 2.0 and then
mark them as stable in 2.1. Despite being "experimental", there has been no
breaking changes to DataFrame from 1.3 to 1.6.



On Wed, Nov 18, 2015 at 3:43 PM, Mark Hamstra 
wrote:

> Ah, got it; by "stabilize" you meant changing the API, not just bug
> fixing.  We're on the same page now.
>
> On Wed, Nov 18, 2015 at 3:39 PM, Kostas Sakellis 
> wrote:
>
>> A 1.6.x release will only fix bugs - we typically don't change APIs in z
>> releases. The Dataset API is experimental and so we might be changing the
>> APIs before we declare it stable. This is why I think it is important to
>> first stabilize the Dataset API with a Spark 1.7 release before moving to
>> Spark 2.0. This will benefit users that would like to use the new Dataset
>> APIs but can't move to Spark 2.0 because of the backwards incompatible
>> changes, like removal of deprecated APIs, Scala 2.11 etc.
>>
>> Kostas
>>
>>
>> On Fri, Nov 13, 2015 at 12:26 PM, Mark Hamstra 
>> wrote:
>>
>>> Why does stabilization of those two features require a 1.7 release
>>> instead of 1.6.1?
>>>
>>> On Fri, Nov 13, 2015 at 11:40 AM, Kostas Sakellis 
>>> wrote:
>>>
 We have veered off the topic of Spark 2.0 a little bit here - yes we
 can talk about RDD vs. DS/DF more but lets refocus on Spark 2.0. I'd like
 to propose we have one more 1.x release after Spark 1.6. This will allow us
 to stabilize a few of the new features that were added in 1.6:

 1) the experimental Datasets API
 2) the new unified memory manager.

 I understand our goal for Spark 2.0 is to offer an easy transition but
 there will be users that won't be able to seamlessly upgrade given what we
 have discussed as in scope for 2.0. For these users, having a 1.x release
 with these new features/APIs stabilized will be very beneficial. This might
 make Spark 1.7 a lighter release but that is not necessarily a bad thing.

 Any thoughts on this timeline?

 Kostas Sakellis



 On Thu, Nov 12, 2015 at 8:39 PM, Cheng, Hao 
 wrote:

> Agree, more features/apis/optimization need to be added in DF/DS.
>
>
>
> I mean, we need to think about what kind of RDD APIs we have to
> provide to developer, maybe the fundamental API is enough, like, the
> ShuffledRDD etc..  But PairRDDFunctions probably not in this category, as
> we can do the same thing easily with DF/DS, even better performance.
>
>
>
> *From:* Mark Hamstra [mailto:m...@clearstorydata.com]
> *Sent:* Friday, November 13, 2015 11:23 AM
> *To:* Stephen Boesch
>
> *Cc:* dev@spark.apache.org
> *Subject:* Re: A proposal for Spark 2.0
>
>
>
> Hmmm... to me, that seems like precisely the kind of thing that argues
> for retaining the RDD API but not as the first thing presented to new 
> Spark
> developers: "Here's how to use groupBy with DataFrames Until the
> optimizer is more fully developed, that won't always get you the best
> performance that can be obtained.  In these particular circumstances, ...,
> you may want to use the low-level RDD API while setting
> preservesPartitioning to true.  Like this"
>
>
>
> On Thu, Nov 12, 2015 at 7:05 PM, Stephen Boesch 
> wrote:
>
> My understanding is that  the RDD's presently have more support for
> complete control of partitioning which is a key consideration at scale.
> While partitioning control is still piecemeal in  DF/DS  it would seem
> premature to make RDD's a second-tier approach to spark dev.
>
>
>
> An example is the use of groupBy when we know that the source relation
> (/RDD) is already partitioned on the grouping expressions.  AFAIK the 
> spark
> sql still does not allow that knowledge to be applied to the optimizer - 
> so
> a full shuffle will be performed. However in the native RDD we can use
> preservesPartitioning=true.
>
>
>
> 2015-11-12 17:42 GMT-08:00 Mark Hamstra :
>
> The place of the RDD API in 2.0 is also something I've been wondering
> about.  I think it may be going too far to deprecate it, but changing
> emphasis is something that we might consider.  The RDD API came well 
> before
> DataFrames and DataSets, so programming guides, introductory how-to
> articles and the like have, to this point, also tended to emphasize RDDs 
> --
> or at least to dea

Re: load multiple directory using dataframe load

2015-11-23 Thread Fengdong Yu
hiveContext.read.format(“orc”).load(“bypath/*”)



> On Nov 24, 2015, at 1:07 PM, Renu Yadav  wrote:
> 
> Hi ,
> 
> I am using dataframe and want to load orc file using multiple directory
> like this:
> hiveContext.read.format.load("mypath/3660,myPath/3661")
> 
> but it is not working.
> 
> Please suggest how to achieve this
> 
> Thanks & Regards,
> Renu Yadav


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Fastest way to build Spark from scratch

2015-11-23 Thread Nicholas Chammas
Say I want to build a complete Spark distribution against Hadoop 2.6+ as
fast as possible from scratch.

This is what I’m doing at the moment:

./make-distribution.sh -T 1C -Phadoop-2.6

-T 1C instructs Maven to spin up 1 thread per available core. This takes
around 20 minutes on an m3.large instance.

I see that spark-ec2, on the other hand, builds Spark as follows

when you deploy Spark at a specific git commit:

sbt/sbt clean assembly
sbt/sbt publish-local

This seems slower than using make-distribution.sh, actually.

Is there a faster way to do this?

Nick
​


Re: why does shuffle in spark write shuffle data to disk by default?

2015-11-23 Thread Reynold Xin
I think for most jobs the bottleneck isn't in writing shuffle data to disk,
since shuffle data needs to be "shuffled" and sent across the network.

You can always use a ramdisk yourself. Requiring ramdisk by default would
significantly complicate configuration and platform portability.


On Mon, Nov 23, 2015 at 5:36 PM, huan zhang  wrote:

> Hi All,
> I'm wonderring why does shuffle in spark write shuffle data to disk by
> default?
> In Stackoverflow, someone said it's used by FTS, but node down is the
> most common reason of fault, and write to disk cannot do FTS in this case
> either.
> So why not use ramdisk as default instread of SDD or HDD only?
>
> Thanks
> Hubert Zhang
>


Re: Datasets on experimental dataframes?

2015-11-23 Thread Reynold Xin
The experimental tag is intended for user facing APIs. It has nothing to do
with internal dependencies.

On Monday, November 23, 2015, Jakob Odersky  wrote:

> Hi,
>
> datasets are being built upon the experimental DataFrame API, does this
> mean DataFrames won't be experimental in the near future?
>
> thanks,
> --Jakob
>


Datasets on experimental dataframes?

2015-11-23 Thread Jakob Odersky
Hi,

datasets are being built upon the experimental DataFrame API, does this
mean DataFrames won't be experimental in the near future?

thanks,
--Jakob


why does shuffle in spark write shuffle data to disk by default?

2015-11-23 Thread huan zhang
Hi All,
I'm wonderring why does shuffle in spark write shuffle data to disk by
default?
In Stackoverflow, someone said it's used by FTS, but node down is the
most common reason of fault, and write to disk cannot do FTS in this case
either.
So why not use ramdisk as default instread of SDD or HDD only?

Thanks
Hubert Zhang


Re: Spark-1.6.0-preview2 trackStateByKey exception restoring state

2015-11-23 Thread Tathagata Das
My intention is to make it compatible! Filed this bug -
https://issues.apache.org/jira/browse/SPARK-11932
Looking into it right now. Thanks for testing it out and reporting this!


On Mon, Nov 23, 2015 at 7:22 AM, jan  wrote:

> Hi guys,
>
> I'm trying out the new trackStateByKey API of the Spark-1.6.0-preview2
> release and I'm encountering an exception when trying to restore previously
> checkpointed state in spark streaming.
>
> Use case:
> - execute a stateful Spark streaming job using trackStateByKey
> - interrupt / kill the job
> - start the job again (without any code changes or cleaning out the
> checkpoint directory)
>
> Upon this restart, I encounter the exception below. The nature of the
> exception makes me think either I am doing something wrong, or there's a
> problem with this use case for the new trackStateByKey API.
>
> I uploaded my job code (
> https://gist.github.com/juyttenh/be7973b0c5c2eddd8a81), but it's
> basically just a modified version of the spark streaming example
> StatefulNetworkWordCount (that had already been updated to use
> trackStateByKey). My version however includes the usage of
> StreamingContext.getOrCreate to actually read the checkpointed state when
> the job is started, leading to the exception below.
>
> Just to make sure: using StreamingContext.getOrCreate should still be
> compatible with using the trackStateByKey API?
>
> Thanx,
> Jan
>
> 15/11/23 10:55:07 ERROR StreamingContext: Error starting the context,
> marking it as stopped
>
> java.lang.IllegalArgumentException: requirement failed
>
> at scala.Predef$.require(Predef.scala:221)
>
> at
> org.apache.spark.streaming.rdd.TrackStateRDD.(TrackStateRDD.scala:133)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143)
>
> at scala.Option.map(Option.scala:145)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>
> at
> org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(

Re: [ANNOUNCE] Spark 1.6.0 Release Preview

2015-11-23 Thread Dean Wampler
I'm seeing an RPC timeout with the 2.11 build, but not the Hadoop1, 2.10
build: The following session with two uses of sc.parallize causes it almost
every the time. Occasionally I don't see the stack trace and I don't see it
with just a single sc.parallize, even the bigger, second one. When the
error occurs, it does pause for about two minutes with no output before the
stack trace. I elided some output; why all the non-log4j warnings occur at
startup is another question:


$ pwd
/Users/deanwampler/projects/spark/spark-1.6.0-bin-hadoop1-scala2.11
$ ./bin/spark-shell
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.security.Groups).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Using Spark's repl log4j profile:
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Spark context available as sc.
15/11/23 13:01:45 WARN Connection: BoneCP specified but not present in
CLASSPATH (or one of dependencies)
15/11/23 13:01:45 WARN Connection: BoneCP specified but not present in
CLASSPATH (or one of dependencies)
15/11/23 13:01:49 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0
15/11/23 13:01:49 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException
15/11/23 13:01:49 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/11/23 13:01:50 WARN Connection: BoneCP specified but not present in
CLASSPATH (or one of dependencies)
15/11/23 13:01:50 WARN Connection: BoneCP specified but not present in
CLASSPATH (or one of dependencies)
SQL context available as sqlContext.
Welcome to
    __
   / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0-SNAPSHOT
 /_/

Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sc.parallelize((1 to 10), 1).count()

[Stage 0:>  (0 + 0) /
1]
[Stage 0:==>  (420 + 4) /
1]
[Stage 0:===> (683 + 4) /
1]
... elided ...
[Stage 0:==> (8264 + 4) /
1]
[Stage 0:==> (8902 + 6) /
1]
[Stage 0:=>  (9477 + 4) /
1]

res0: Long = 10

scala> sc.parallelize((1 to 100), 10).count()

[Stage 1:> (0 + 0) /
10]
[Stage 1:> (0 + 0) /
10]
[Stage 1:> (0 + 0) /
10]15/11/23 13:04:09 WARN NettyRpcEndpointRef: Error sending message
[message = Heartbeat(driver,[Lscala.Tuple2;@7f9d659c,BlockManagerId(driver,
localhost, 55188))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org
$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
  at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
  at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
  at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
  at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
  at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
  at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
  at org.apache.spark.executor.Executor.org
$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:452)
  at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472)
  at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
  at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1708)
  at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at
java.util.con

question about combining small input splits

2015-11-23 Thread Nezih
Hi Spark Devs,
I tried getting an answer to my question in the user mailing list, but so
far couldn't. That's why I wanted to try the dev mailing list too in case
someone can help me.

I have a Hive table that has a lot of small parquet files and I am creating
a data frame out of it to do some processing, but since I have a large
number of splits/files my job creates a lot of tasks, which I don't want.
Basically what I want is the same functionality that Hive provides, that is,
to combine these small input splits into larger ones by specifying a max
split size setting. Is this currently possible with Spark?

I look at coalesce() but with coalesce I can only control the number of
output files not their sizes. And since the total input dataset size can
vary significantly in my case, I cannot just use a fixed partition count as
the size of each output file can get very large. I then looked for getting
the total input size from an rdd to come up with some heuristic to set the
partition count, but I couldn't find any ways to do it.

Any help is appreciated.

Thanks,

Nezih



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/question-about-combining-small-input-splits-tp15324.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [ANNOUNCE] Spark 1.6.0 Release Preview

2015-11-23 Thread mkhaitman
Nice! Built and testing on CentOS 7 on a Hadoop 2.7.1 cluster.

One thing I've noticed is that KeyboardInterrupts are now ignored? Is that
intended? I starting typing a line out and then changed my mind and wanted
to issue the good old ctrl+c to interrupt, but that didn't work.

Otherwise haven't seen any major issues yet!

Mark.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/ANNOUNCE-Spark-1-6-0-Release-Preview-tp15314p15323.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Removing the Mesos fine-grained mode

2015-11-23 Thread Jerry Lam
@Andrew Or

I assume you are referring to this ticket [SPARK-5095]: 
https://issues.apache.org/jira/browse/SPARK-5095 
 
Thank you!

Best Regards,

Jerry

> On Nov 23, 2015, at 2:41 PM, Andrew Or  wrote:
> 
> @Jerry Lam
> 
> Can someone confirm if it is true that dynamic allocation on mesos "is 
> designed to run one executor per slave with the configured amount of 
> resources." I copied this sentence from the documentation. Does this mean 
> there is at most 1 executor per node? Therefore,  if you have a big machine, 
> you need to allocate a fat executor on this machine in order to fully utilize 
> it?
> 
> Mesos inherently does not support multiple executors per slave currently. 
> This is actually not related to dynamic allocation. There is, however, an 
> outstanding patch to add support for multiple executors per slave. When that 
> feature is merged, it will work well with dynamic allocation.
>  
> 
> 2015-11-23 9:27 GMT-08:00 Adam McElwee  >:
> 
> 
> On Mon, Nov 23, 2015 at 7:36 AM, Iulian Dragoș  > wrote:
> 
> 
> On Sat, Nov 21, 2015 at 3:37 AM, Adam McElwee  > wrote:
> I've used fine-grained mode on our mesos spark clusters until this week, 
> mostly because it was the default. I started trying coarse-grained because of 
> the recent chatter on the mailing list about wanting to move the mesos 
> execution path to coarse-grained only. The odd things is, coarse-grained vs 
> fine-grained seems to yield drastic cluster utilization metrics for any of 
> our jobs that I've tried out this week.
> 
> If this is best as a new thread, please let me know, and I'll try not to 
> derail this conversation. Otherwise, details below:
> 
> I think it's ok to discuss it here.
>  
> We monitor our spark clusters with ganglia, and historically, we maintain at 
> least 90% cpu utilization across the cluster. Making a single configuration 
> change to use coarse-grained execution instead of fine-grained consistently 
> yields a cpu utilization pattern that starts around 90% at the beginning of 
> the job, and then it slowly decreases over the next 1-1.5 hours to level out 
> around 65% cpu utilization on the cluster. Does anyone have a clue why I'd be 
> seeing such a negative effect of switching to coarse-grained mode? GC 
> activity is comparable in both cases. I've tried 1.5.2, as well as the 1.6.0 
> preview tag that's on github.
> 
> I'm not very familiar with Ganglia, and how it computes utilization. But one 
> thing comes to mind: did you enable dynamic allocation 
> 
>  on coarse-grained mode?
> 
> Dynamic allocation is definitely not enabled. The only delta between runs is 
> adding --conf "spark.mesos.coarse=true" the job submission. Ganglia is just 
> pulling stats from the procfs, and I've never seen it report bad results. If 
> I sample any of the 100-200 nodes in the cluster, dstat reflects the same 
> average cpu that I'm seeing reflected in ganglia.
> 
> iulian
> 
> 



Re: Removing the Mesos fine-grained mode

2015-11-23 Thread Jerry Lam
Hi Andrew,

Thank you for confirming this. I’m referring to this because I used 
fine-grained mode before and it was a headache because of the memory issue. 
Therefore, I switched to Yarn with dynamic allocation. I was thinking if I can 
switch back to Mesos with coarse-grained mode + dynamic allocation but from 
what you explain to me, I still cannot have more than 1 executor per slave. 
This sounds like a deal breaker for me because if I have a slave of 100GB of 
RAM and a slave of 30GB, I cannot utilize the instance of 100GB of RAM fully if 
I specify spark.executor.memory = 20GB. The two slaves will each consume 20GB 
in this case even though there is 80GB left for the bigger machine. If I 
specify 90GB for spark.executor.memory, the only active slave is the one with 
100GB. Therefore the slave with 30GB will be idled. 

Do you know the link to the JIRA that I can receive update for the feature you 
mention? We have intentions to use Mesos but it is proven difficult with our 
tight budget constraint. 

Best Regards,

Jerry


> On Nov 23, 2015, at 2:41 PM, Andrew Or  wrote:
> 
> @Jerry Lam
> 
> Can someone confirm if it is true that dynamic allocation on mesos "is 
> designed to run one executor per slave with the configured amount of 
> resources." I copied this sentence from the documentation. Does this mean 
> there is at most 1 executor per node? Therefore,  if you have a big machine, 
> you need to allocate a fat executor on this machine in order to fully utilize 
> it?
> 
> Mesos inherently does not support multiple executors per slave currently. 
> This is actually not related to dynamic allocation. There is, however, an 
> outstanding patch to add support for multiple executors per slave. When that 
> feature is merged, it will work well with dynamic allocation.
>  
> 
> 2015-11-23 9:27 GMT-08:00 Adam McElwee  >:
> 
> 
> On Mon, Nov 23, 2015 at 7:36 AM, Iulian Dragoș  > wrote:
> 
> 
> On Sat, Nov 21, 2015 at 3:37 AM, Adam McElwee  > wrote:
> I've used fine-grained mode on our mesos spark clusters until this week, 
> mostly because it was the default. I started trying coarse-grained because of 
> the recent chatter on the mailing list about wanting to move the mesos 
> execution path to coarse-grained only. The odd things is, coarse-grained vs 
> fine-grained seems to yield drastic cluster utilization metrics for any of 
> our jobs that I've tried out this week.
> 
> If this is best as a new thread, please let me know, and I'll try not to 
> derail this conversation. Otherwise, details below:
> 
> I think it's ok to discuss it here.
>  
> We monitor our spark clusters with ganglia, and historically, we maintain at 
> least 90% cpu utilization across the cluster. Making a single configuration 
> change to use coarse-grained execution instead of fine-grained consistently 
> yields a cpu utilization pattern that starts around 90% at the beginning of 
> the job, and then it slowly decreases over the next 1-1.5 hours to level out 
> around 65% cpu utilization on the cluster. Does anyone have a clue why I'd be 
> seeing such a negative effect of switching to coarse-grained mode? GC 
> activity is comparable in both cases. I've tried 1.5.2, as well as the 1.6.0 
> preview tag that's on github.
> 
> I'm not very familiar with Ganglia, and how it computes utilization. But one 
> thing comes to mind: did you enable dynamic allocation 
> 
>  on coarse-grained mode?
> 
> Dynamic allocation is definitely not enabled. The only delta between runs is 
> adding --conf "spark.mesos.coarse=true" the job submission. Ganglia is just 
> pulling stats from the procfs, and I've never seen it report bad results. If 
> I sample any of the 100-200 nodes in the cluster, dstat reflects the same 
> average cpu that I'm seeing reflected in ganglia.
> 
> iulian
> 
> 



Re: Removing the Mesos fine-grained mode

2015-11-23 Thread Andrew Or
@Jerry Lam

Can someone confirm if it is true that dynamic allocation on mesos "is
> designed to run one executor per slave with the configured amount of
> resources." I copied this sentence from the documentation. Does this mean
> there is at most 1 executor per node? Therefore,  if you have a big
> machine, you need to allocate a fat executor on this machine in order to
> fully utilize it?


Mesos inherently does not support multiple executors per slave currently.
This is actually not related to dynamic allocation. There is, however, an
outstanding patch to add support for multiple executors per slave. When
that feature is merged, it will work well with dynamic allocation.


2015-11-23 9:27 GMT-08:00 Adam McElwee :

>
>
> On Mon, Nov 23, 2015 at 7:36 AM, Iulian Dragoș  > wrote:
>
>>
>>
>> On Sat, Nov 21, 2015 at 3:37 AM, Adam McElwee  wrote:
>>
>>> I've used fine-grained mode on our mesos spark clusters until this week,
>>> mostly because it was the default. I started trying coarse-grained because
>>> of the recent chatter on the mailing list about wanting to move the mesos
>>> execution path to coarse-grained only. The odd things is, coarse-grained vs
>>> fine-grained seems to yield drastic cluster utilization metrics for any of
>>> our jobs that I've tried out this week.
>>>
>>> If this is best as a new thread, please let me know, and I'll try not to
>>> derail this conversation. Otherwise, details below:
>>>
>>
>> I think it's ok to discuss it here.
>>
>>
>>> We monitor our spark clusters with ganglia, and historically, we
>>> maintain at least 90% cpu utilization across the cluster. Making a single
>>> configuration change to use coarse-grained execution instead of
>>> fine-grained consistently yields a cpu utilization pattern that starts
>>> around 90% at the beginning of the job, and then it slowly decreases over
>>> the next 1-1.5 hours to level out around 65% cpu utilization on the
>>> cluster. Does anyone have a clue why I'd be seeing such a negative effect
>>> of switching to coarse-grained mode? GC activity is comparable in both
>>> cases. I've tried 1.5.2, as well as the 1.6.0 preview tag that's on github.
>>>
>>
>> I'm not very familiar with Ganglia, and how it computes utilization. But
>> one thing comes to mind: did you enable dynamic allocation
>> 
>> on coarse-grained mode?
>>
>
> Dynamic allocation is definitely not enabled. The only delta between runs
> is adding --conf "spark.mesos.coarse=true" the job submission. Ganglia is
> just pulling stats from the procfs, and I've never seen it report bad
> results. If I sample any of the 100-200 nodes in the cluster, dstat
> reflects the same average cpu that I'm seeing reflected in ganglia.
>
>>
>> iulian
>>
>
>


Re: Removing the Mesos fine-grained mode

2015-11-23 Thread Adam McElwee
On Mon, Nov 23, 2015 at 7:36 AM, Iulian Dragoș 
wrote:

>
>
> On Sat, Nov 21, 2015 at 3:37 AM, Adam McElwee  wrote:
>
>> I've used fine-grained mode on our mesos spark clusters until this week,
>> mostly because it was the default. I started trying coarse-grained because
>> of the recent chatter on the mailing list about wanting to move the mesos
>> execution path to coarse-grained only. The odd things is, coarse-grained vs
>> fine-grained seems to yield drastic cluster utilization metrics for any of
>> our jobs that I've tried out this week.
>>
>> If this is best as a new thread, please let me know, and I'll try not to
>> derail this conversation. Otherwise, details below:
>>
>
> I think it's ok to discuss it here.
>
>
>> We monitor our spark clusters with ganglia, and historically, we maintain
>> at least 90% cpu utilization across the cluster. Making a single
>> configuration change to use coarse-grained execution instead of
>> fine-grained consistently yields a cpu utilization pattern that starts
>> around 90% at the beginning of the job, and then it slowly decreases over
>> the next 1-1.5 hours to level out around 65% cpu utilization on the
>> cluster. Does anyone have a clue why I'd be seeing such a negative effect
>> of switching to coarse-grained mode? GC activity is comparable in both
>> cases. I've tried 1.5.2, as well as the 1.6.0 preview tag that's on github.
>>
>
> I'm not very familiar with Ganglia, and how it computes utilization. But
> one thing comes to mind: did you enable dynamic allocation
> 
> on coarse-grained mode?
>

Dynamic allocation is definitely not enabled. The only delta between runs
is adding --conf "spark.mesos.coarse=true" the job submission. Ganglia is
just pulling stats from the procfs, and I've never seen it report bad
results. If I sample any of the 100-200 nodes in the cluster, dstat
reflects the same average cpu that I'm seeing reflected in ganglia.

>
> iulian
>


Spark-1.6.0-preview2 trackStateByKey exception restoring state

2015-11-23 Thread jan
Hi guys,

I'm trying out the new trackStateByKey API of the Spark-1.6.0-preview2
release and I'm encountering an exception when trying to restore previously
checkpointed state in spark streaming.

Use case:
- execute a stateful Spark streaming job using trackStateByKey
- interrupt / kill the job
- start the job again (without any code changes or cleaning out the
checkpoint directory)

Upon this restart, I encounter the exception below. The nature of the
exception makes me think either I am doing something wrong, or there's a
problem with this use case for the new trackStateByKey API.

I uploaded my job code (
https://gist.github.com/juyttenh/be7973b0c5c2eddd8a81), but it's basically
just a modified version of the spark streaming example
StatefulNetworkWordCount (that had already been updated to use
trackStateByKey). My version however includes the usage of
StreamingContext.getOrCreate to actually read the checkpointed state when
the job is started, leading to the exception below.

Just to make sure: using StreamingContext.getOrCreate should still be
compatible with using the trackStateByKey API?

Thanx,
Jan

15/11/23 10:55:07 ERROR StreamingContext: Error starting the context,
marking it as stopped

java.lang.IllegalArgumentException: requirement failed

at scala.Predef$.require(Predef.scala:221)

at
org.apache.spark.streaming.rdd.TrackStateRDD.(TrackStateRDD.scala:133)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143)

at scala.Option.map(Option.scala:145)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)

at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGene

Re: Removing the Mesos fine-grained mode

2015-11-23 Thread Jerry Lam
Hi guys,

Can someone confirm if it is true that dynamic allocation on mesos "is designed 
to run one executor per slave with the configured amount of resources." I 
copied this sentence from the documentation. Does this mean there is at most 1 
executor per node? Therefore,  if you have a big machine, you need to allocate 
a fat executor on this machine in order to fully utilize it?

Best Regards,

Sent from my iPhone

> On 23 Nov, 2015, at 8:36 am, Iulian Dragoș  wrote:
> 
> 
> 
>> On Sat, Nov 21, 2015 at 3:37 AM, Adam McElwee  wrote:
>> I've used fine-grained mode on our mesos spark clusters until this week, 
>> mostly because it was the default. I started trying coarse-grained because 
>> of the recent chatter on the mailing list about wanting to move the mesos 
>> execution path to coarse-grained only. The odd things is, coarse-grained vs 
>> fine-grained seems to yield drastic cluster utilization metrics for any of 
>> our jobs that I've tried out this week.
>> 
>> If this is best as a new thread, please let me know, and I'll try not to 
>> derail this conversation. Otherwise, details below:
> 
> I think it's ok to discuss it here.
>  
>> We monitor our spark clusters with ganglia, and historically, we maintain at 
>> least 90% cpu utilization across the cluster. Making a single configuration 
>> change to use coarse-grained execution instead of fine-grained consistently 
>> yields a cpu utilization pattern that starts around 90% at the beginning of 
>> the job, and then it slowly decreases over the next 1-1.5 hours to level out 
>> around 65% cpu utilization on the cluster. Does anyone have a clue why I'd 
>> be seeing such a negative effect of switching to coarse-grained mode? GC 
>> activity is comparable in both cases. I've tried 1.5.2, as well as the 1.6.0 
>> preview tag that's on github.
> 
> I'm not very familiar with Ganglia, and how it computes utilization. But one 
> thing comes to mind: did you enable dynamic allocation on coarse-grained mode?
> 
> iulian


Re: Removing the Mesos fine-grained mode

2015-11-23 Thread Iulian Dragoș
On Sat, Nov 21, 2015 at 3:37 AM, Adam McElwee  wrote:

> I've used fine-grained mode on our mesos spark clusters until this week,
> mostly because it was the default. I started trying coarse-grained because
> of the recent chatter on the mailing list about wanting to move the mesos
> execution path to coarse-grained only. The odd things is, coarse-grained vs
> fine-grained seems to yield drastic cluster utilization metrics for any of
> our jobs that I've tried out this week.
>
> If this is best as a new thread, please let me know, and I'll try not to
> derail this conversation. Otherwise, details below:
>

I think it's ok to discuss it here.


> We monitor our spark clusters with ganglia, and historically, we maintain
> at least 90% cpu utilization across the cluster. Making a single
> configuration change to use coarse-grained execution instead of
> fine-grained consistently yields a cpu utilization pattern that starts
> around 90% at the beginning of the job, and then it slowly decreases over
> the next 1-1.5 hours to level out around 65% cpu utilization on the
> cluster. Does anyone have a clue why I'd be seeing such a negative effect
> of switching to coarse-grained mode? GC activity is comparable in both
> cases. I've tried 1.5.2, as well as the 1.6.0 preview tag that's on github.
>

I'm not very familiar with Ganglia, and how it computes utilization. But
one thing comes to mind: did you enable dynamic allocation

on coarse-grained mode?

iulian