Israel Spark Meetup

2016-09-20 Thread Romi Kuntsman
Hello,
Please add a link in Spark Community page (
https://spark.apache.org/community.html)
To Israel Spark Meetup (https://www.meetup.com/israel-spark-users/)
We're an active meetup group, unifying the local Spark user community, and
having regular meetups.
Thanks!
Romi K.


diff between apps and waitingApps?

2015-12-01 Thread Romi Kuntsman
Hello,

I'm collecting metrics of master.apps and master.waitingApps, and I see the
values always match.

ref:
https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala

What is the meaning of "waitingApps"?
And if the only place it's used is in "startExecutorsOnWorkers" where they
are filtered as "app.coresLeft > 0", shouldn't that also be filtered in the
reported metric?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com


Re: Shuffle FileNotFound Exception

2015-11-18 Thread Romi Kuntsman
take executor memory times spark.shuffle.memoryFraction
and divide the data so that each partition is less than the above

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Wed, Nov 18, 2015 at 2:09 PM, Tom Arnfeld  wrote:

> Hi Romi,
>
> Thanks! Could you give me an indication of how much increase the
> partitions by? We’ll take a stab in the dark, the input data is around 5M
> records (though each record is fairly small). We’ve had trouble both with
> DataFrames and RDDs.
>
> Tom.
>
> On 18 Nov 2015, at 12:04, Romi Kuntsman  wrote:
>
> I had many issues with shuffles (but not this one exactly), and what
> eventually solved it was to repartition to input into more parts. Have you
> tried that?
>
> P.S. not sure if related, but there's a memory leak in the shuffle
> mechanism
> https://issues.apache.org/jira/browse/SPARK-11293
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld  wrote:
>
>> Hey,
>>
>> I’m wondering if anyone has run into issues with Spark 1.5 and a
>> FileNotFound exception with shuffle.index files? It’s been cropping up with
>> very large joins and aggregations, and causing all of our jobs to fail
>> towards the end. The memory limit for the executors (we’re running on
>> mesos) is touching 60GB+ with ~10 cores per executor, which is way
>> oversubscribed.
>>
>> We’re running spark inside containers, and have configured
>> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the
>> container for performance/disk reasons, and since then the issue started to
>> arise. I’m wondering if there’s a bug with the way spark looks for shuffle
>> files, and one of the implementations isn’t obeying the path properly?
>>
>> I don’t want to set "spark.local.dir” because that requires the driver
>> also have this directory set up, which is not the case.
>>
>> Has anyone seen this issue before?
>>
>> 
>>
>> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to
>> get block(s) from XXX:50777
>> java.lang.RuntimeException: java.io.FileNotFoundException:
>> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
>> (No such file or directory)
>>at java.io.FileInputStream.open(Native Method)
>>at java.io.FileInputStream.(FileInputStream.java:146)
>>at
>> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
>>at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>>at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>>at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>>at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>>at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.fire

Re: Shuffle FileNotFound Exception

2015-11-18 Thread Romi Kuntsman
I had many issues with shuffles (but not this one exactly), and what
eventually solved it was to repartition to input into more parts. Have you
tried that?

P.S. not sure if related, but there's a memory leak in the shuffle mechanism
https://issues.apache.org/jira/browse/SPARK-11293

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld  wrote:

> Hey,
>
> I’m wondering if anyone has run into issues with Spark 1.5 and a
> FileNotFound exception with shuffle.index files? It’s been cropping up with
> very large joins and aggregations, and causing all of our jobs to fail
> towards the end. The memory limit for the executors (we’re running on
> mesos) is touching 60GB+ with ~10 cores per executor, which is way
> oversubscribed.
>
> We’re running spark inside containers, and have configured
> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the
> container for performance/disk reasons, and since then the issue started to
> arise. I’m wondering if there’s a bug with the way spark looks for shuffle
> files, and one of the implementations isn’t obeying the path properly?
>
> I don’t want to set "spark.local.dir” because that requires the driver
> also have this directory set up, which is not the case.
>
> Has anyone seen this issue before?
>
> 
>
> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get
> block(s) from XXX:50777
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
> (No such file or directory)
>at java.io.FileInputStream.open(Native Method)
>at java.io.FileInputStream.(FileInputStream.java:146)
>at
> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
>at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
>at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
>at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>at
> io.netty.channel.nio.Ab

Re: Some spark apps fail with "All masters are unresponsive", while others pass normally

2015-11-09 Thread Romi Kuntsman
I didn't see anything about a OOM.
This happens sometimes before anything in the application happened, and
happens to a few applications at the same time - so I guess it's a
communication failure, but the problem is that the error shown doesn't
represent the actual problem (which may be a network timeout etc)

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Nov 9, 2015 at 6:00 PM, Akhil Das 
wrote:

> Did you find anything regarding the OOM in the executor logs?
>
> Thanks
> Best Regards
>
> On Mon, Nov 9, 2015 at 8:44 PM, Romi Kuntsman  wrote:
>
>> If they have a problem managing memory, wouldn't there should be a OOM?
>> Why does AppClient throw a NPE?
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>> On Mon, Nov 9, 2015 at 4:59 PM, Akhil Das 
>> wrote:
>>
>>> Is that all you have in the executor logs? I suspect some of those jobs
>>> are having a hard time managing  the memory.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sun, Nov 1, 2015 at 9:38 PM, Romi Kuntsman  wrote:
>>>
>>>> [adding dev list since it's probably a bug, but i'm not sure how to
>>>> reproduce so I can open a bug about it]
>>>>
>>>> Hi,
>>>>
>>>> I have a standalone Spark 1.4.0 cluster with 100s of applications
>>>> running every day.
>>>>
>>>> From time to time, the applications crash with the following error (see
>>>> below)
>>>> But at the same time (and also after that), other applications are
>>>> running, so I can safely assume the master and workers are working.
>>>>
>>>> 1. why is there a NullPointerException? (i can't track the scala stack
>>>> trace to the code, but anyway NPE is usually a obvious bug even if there's
>>>> actually a network error...)
>>>> 2. why can't it connect to the master? (if it's a network timeout, how
>>>> to increase it? i see the values are hardcoded inside AppClient)
>>>> 3. how to recover from this error?
>>>>
>>>>
>>>>   ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application
>>>> has been killed. Reason: All masters are unresponsive! Giving up. ERROR
>>>>   ERROR 01-11 15:32:55,087  OneForOneStrategy - ERROR
>>>> logs/error.log
>>>>   java.lang.NullPointerException NullPointerException
>>>>   at
>>>> org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160)
>>>>   at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>   at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>   at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>   at
>>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
>>>>   at
>>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>>>>   at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>   at
>>>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>>>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>   at
>>>> org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61)
>>>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>   at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>>   at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>   at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>   at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>   at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>   ERROR 01-11 15:32:55,603   SparkContext - Error
>>>> initializing SparkContext. ERROR
>>>>   java.lang.IllegalStateException: Cannot call methods on a stopped
>>>> SparkContext
>>>>   at org.apache.spark.SparkContext.org
>>>> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
>>>>   at
>>>> org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
>>>>   at
>>>> org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
>>>>   at org.apache.spark.SparkContext.(SparkContext.scala:543)
>>>>   at
>>>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> *Romi Kuntsman*, *Big Data Engineer*
>>>> http://www.totango.com
>>>>
>>>
>>>
>>
>


Re: Some spark apps fail with "All masters are unresponsive", while others pass normally

2015-11-09 Thread Romi Kuntsman
If they have a problem managing memory, wouldn't there should be a OOM?
Why does AppClient throw a NPE?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Nov 9, 2015 at 4:59 PM, Akhil Das 
wrote:

> Is that all you have in the executor logs? I suspect some of those jobs
> are having a hard time managing  the memory.
>
> Thanks
> Best Regards
>
> On Sun, Nov 1, 2015 at 9:38 PM, Romi Kuntsman  wrote:
>
>> [adding dev list since it's probably a bug, but i'm not sure how to
>> reproduce so I can open a bug about it]
>>
>> Hi,
>>
>> I have a standalone Spark 1.4.0 cluster with 100s of applications running
>> every day.
>>
>> From time to time, the applications crash with the following error (see
>> below)
>> But at the same time (and also after that), other applications are
>> running, so I can safely assume the master and workers are working.
>>
>> 1. why is there a NullPointerException? (i can't track the scala stack
>> trace to the code, but anyway NPE is usually a obvious bug even if there's
>> actually a network error...)
>> 2. why can't it connect to the master? (if it's a network timeout, how to
>> increase it? i see the values are hardcoded inside AppClient)
>> 3. how to recover from this error?
>>
>>
>>   ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application
>> has been killed. Reason: All masters are unresponsive! Giving up. ERROR
>>   ERROR 01-11 15:32:55,087  OneForOneStrategy - ERROR
>> logs/error.log
>>   java.lang.NullPointerException NullPointerException
>>   at
>> org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160)
>>   at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>   at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>   at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>   at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
>>   at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>>   at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>   at
>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>   at
>> org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61)
>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>   at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>   at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>   at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>   at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>   at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>   ERROR 01-11 15:32:55,603   SparkContext - Error
>> initializing SparkContext. ERROR
>>   java.lang.IllegalStateException: Cannot call methods on a stopped
>> SparkContext
>>   at org.apache.spark.SparkContext.org
>> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
>>   at
>> org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
>>   at
>> org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
>>   at org.apache.spark.SparkContext.(SparkContext.scala:543)
>>   at
>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
>>
>>
>> Thanks!
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>
>


Re: JMX with Spark

2015-11-05 Thread Romi Kuntsman
Have you read this?
https://spark.apache.org/docs/latest/monitoring.html

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Thu, Nov 5, 2015 at 2:08 PM, Yogesh Vyas  wrote:

> Hi,
> How we can use JMX and JConsole to monitor our Spark applications?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Romi Kuntsman
In my program I move between RDD and DataFrame several times.
I know that the entire data of the DF doesn't go into the driver because it
wouldn't fit there.
But calling toJavaRDD does cause computation.

Check the number of partitions you have on the DF and RDD...
On Nov 4, 2015 7:54 PM, "Aliaksei Tsyvunchyk" 
wrote:

> Hello Romi,
>
> Do you mean that in my particular case I’m causing computation on
> dataFrame or it is regular behavior of DataFrame.toJavaRDD ?
> If it’s regular behavior, do you know which approach could be used to
> perform make/reduce on dataFrame without causing it to load all data to
> driver program ?
>
> On Nov 4, 2015, at 12:34 PM, Romi Kuntsman  wrote:
>
> I noticed that toJavaRDD causes a computation on the DataFrame, so is it
> considered an action, even though logically it's a transformation?
> On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" 
> wrote:
>
>> Hello folks,
>>
>> Recently I have noticed unexpectedly big network traffic between Driver
>> Program and Worker node.
>> During debugging I have figured out that it is caused by following block
>> of code
>>
>> —— Java ——— —
>> DataFrame etpvRecords = context.sql(" SOME SQL query here");
>> Mapper m = new Mapper(localValue, ProgramId::toProgId);
>> return etpvRecords
>> .toJavaRDD()
>> .map(m::mapHutPutViewingRow)
>> .reduce(Reducer::reduce);
>> —— Java 
>>
>> I’m using debug breakpoint and OS X nettop to monitor traffic between
>> processes. So before approaching line toJavaRDD() I have 500Kb of traffic
>> and after executing this line I have 2.2 Mb of traffic. But when I check
>> size of result of reduce function it is 10 Kb.
>> So .toJavaRDD() seems causing worker process return dataset to driver
>> process and seems further map/reduce occurs on Driver.
>>
>> This is definitely not expected by me, so I have 2 questions.
>> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole
>> dataset return to driver or I’m doing something wrong?
>> 2.  What is expected way to perform transformation with DataFrame using
>> custom Java map\reduce functions in case if standard SQL features are not
>> fit all my needs?
>>
>> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same
>> machine). Java 1.8.0_60.
>>
>> CONFIDENTIALITY NOTICE: This email and files attached to it are
>> confidential. If you are not the intended recipient you are hereby notified
>> that using, copying, distributing or taking any action in reliance on the
>> contents of this information is strictly prohibited. If you have received
>> this email in error please notify the sender and delete this email.
>>
>
>
> CONFIDENTIALITY NOTICE: This email and files attached to it are
> confidential. If you are not the intended recipient you are hereby notified
> that using, copying, distributing or taking any action in reliance on the
> contents of this information is strictly prohibited. If you have received
> this email in error please notify the sender and delete this email.
>


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Romi Kuntsman
I noticed that toJavaRDD causes a computation on the DataFrame, so is it
considered an action, even though logically it's a transformation?
On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" 
wrote:

> Hello folks,
>
> Recently I have noticed unexpectedly big network traffic between Driver
> Program and Worker node.
> During debugging I have figured out that it is caused by following block
> of code
>
> —— Java ——— —
> DataFrame etpvRecords = context.sql(" SOME SQL query here");
> Mapper m = new Mapper(localValue, ProgramId::toProgId);
> return etpvRecords
> .toJavaRDD()
> .map(m::mapHutPutViewingRow)
> .reduce(Reducer::reduce);
> —— Java 
>
> I’m using debug breakpoint and OS X nettop to monitor traffic between
> processes. So before approaching line toJavaRDD() I have 500Kb of traffic
> and after executing this line I have 2.2 Mb of traffic. But when I check
> size of result of reduce function it is 10 Kb.
> So .toJavaRDD() seems causing worker process return dataset to driver
> process and seems further map/reduce occurs on Driver.
>
> This is definitely not expected by me, so I have 2 questions.
> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole
> dataset return to driver or I’m doing something wrong?
> 2.  What is expected way to perform transformation with DataFrame using
> custom Java map\reduce functions in case if standard SQL features are not
> fit all my needs?
>
> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same
> machine). Java 1.8.0_60.
>
> CONFIDENTIALITY NOTICE: This email and files attached to it are
> confidential. If you are not the intended recipient you are hereby notified
> that using, copying, distributing or taking any action in reliance on the
> contents of this information is strictly prohibited. If you have received
> this email in error please notify the sender and delete this email.
>


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Romi Kuntsman
except "spark.master", do you have "spark://" anywhere in your code or
config files?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. 
wrote:

>
> -- Forwarded message --
> From: "Balachandar R.A." 
> Date: 02-Nov-2015 12:53 pm
> Subject: Re: Error : - No filesystem for scheme: spark
> To: "Jean-Baptiste Onofré" 
> Cc:
>
> > HI JB,
> > Thanks for the response,
> > Here is the content of my spark-defaults.conf
> >
> >
> > # Default system properties included when running spark-submit.
> > # This is useful for setting default environmental settings.
> >
> > # Example:
> >  spark.master spark://fdoat:7077
> > # spark.eventLog.enabled   true
> >  spark.eventLog.dir/home/bala/spark-logs
> > # spark.eventLog.dir   hdfs://namenode:8021/directory
> > # spark.serializer
> org.apache.spark.serializer.KryoSerializer
> > # spark.driver.memory  5g
> > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
> -Dnumbers="one two three"
> >
> >
> > regards
> > Bala
>
> >
> > On 2 November 2015 at 12:21, Jean-Baptiste Onofré 
> wrote:
> >>
> >> Hi,
> >>
> >> do you have something special in conf/spark-defaults.conf (especially
> on the eventLog directory) ?
> >>
> >> Regards
> >> JB
> >>
> >>
> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
> >>>
> >>> Can someone tell me at what point this error could come?
> >>>
> >>> In one of my use cases, I am trying to use hadoop custom input format.
> >>> Here is my code.
> >>>
> >>> |valhConf:Configuration=sc.hadoopConfiguration
> >>>
> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
> >>>
> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
> >>>
> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
> >>>
> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
> >>>
> >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the
> >>> below error in my spark-submit launch|
> >>>
> >>> |
> >>> |
> >>>
> >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage
> >>> 0.0(TID
> 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
> >>> at
> >>>
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
> >>>
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
> >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
> >>>
> >>> Any help here to move towards fixing this will be of great help
> >>>
> >>>
> >>>
> >>> Thanks
> >>>
> >>> Bala
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
>


Some spark apps fail with "All masters are unresponsive", while others pass normally

2015-11-01 Thread Romi Kuntsman
[adding dev list since it's probably a bug, but i'm not sure how to
reproduce so I can open a bug about it]

Hi,

I have a standalone Spark 1.4.0 cluster with 100s of applications running
every day.

>From time to time, the applications crash with the following error (see
below)
But at the same time (and also after that), other applications are running,
so I can safely assume the master and workers are working.

1. why is there a NullPointerException? (i can't track the scala stack
trace to the code, but anyway NPE is usually a obvious bug even if there's
actually a network error...)
2. why can't it connect to the master? (if it's a network timeout, how to
increase it? i see the values are hardcoded inside AppClient)
3. how to recover from this error?


  ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application has
been killed. Reason: All masters are unresponsive! Giving up. ERROR
  ERROR 01-11 15:32:55,087  OneForOneStrategy - ERROR
logs/error.log
  java.lang.NullPointerException NullPointerException
  at
org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
  at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at
org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  ERROR 01-11 15:32:55,603   SparkContext - Error
initializing SparkContext. ERROR
  java.lang.IllegalStateException: Cannot call methods on a stopped
SparkContext
  at org.apache.spark.SparkContext.org
$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
  at
org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
  at
org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
  at org.apache.spark.SparkContext.(SparkContext.scala:543)
  at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)


Thanks!

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com


Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-29 Thread Romi Kuntsman
>
> BUT, after change limit(500) to limit(1000). The code report
> NullPointerException.
>
I had a similar situation, and the problem was with a certain record.
Try to find which records are returned when you limit to 1000 but not
returned when you limit to 500.

Could it be a NPE thrown from PixelObject?
Are you running spark with master=local, so it's running inside your IDE
and you can see the errors from the driver and worker?


*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Thu, Oct 29, 2015 at 10:04 AM, Zhang, Jingyu 
wrote:

> Thanks Romi,
>
> I resize the dataset to 7MB, however, the code show NullPointerException
>  exception as well.
>
> Did you try to cache a DataFrame with just a single row?
>
> Yes, I tried. But, Same problem.
> .
> Do you rows have any columns with null values?
>
> No, I had filter out null values before cache the dataframe.
>
> Can you post a code snippet here on how you load/generate the dataframe?
>
> Sure, Here is the working code 1:
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator()).cache();
>
> System.out.println(pixels.count()); // 3000-4000 rows
>
> Working code 2:
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator());
>
> DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.
> class);
>
> DataFrame totalDF1 = 
> schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
> is not null").limit(500);
>
> System.out.println(totalDF1.count());
>
>
> BUT, after change limit(500) to limit(1000). The code report
> NullPointerException.
>
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator());
>
> DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.
> class);
>
> DataFrame totalDF = 
> schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
> is not null").limit(*1000*);
>
> System.out.println(totalDF.count()); // problem at this line
>
> 15/10/29 18:56:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
>
> 15/10/29 18:56:28 INFO TaskSchedulerImpl: Cancelling stage 0
>
> 15/10/29 18:56:28 INFO DAGScheduler: ShuffleMapStage 0 (count at
> X.java:113) failed in 3.764 s
>
> 15/10/29 18:56:28 INFO DAGScheduler: Job 0 failed: count at XXX.java:113,
> took 3.862207 s
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
> Does dataframe.rdd.cache work?
>
> No, I tried but same exception.
>
> Thanks,
>
> Jingyu
>
> On 29 October 2015 at 17:38, Romi Kuntsman  wrote:
>
>> Did you try to cache a DataFrame with just a single row?
>> Do you rows have any columns with null values?
>> Can you post a code snippet here on how you load/generate the dataframe?
>> Does dataframe.rdd.cache work?
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>> On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu 
>> wrote:
>>
>>> It is not a problem to use JavaRDD.cache() for 200M data (all Objects
>>> read form Json Format). But when I try to use DataFrame.cache(), It shown
>>> exception in below.
>>>
>>> My machine can cache 1 G data in Avro format without any problem.
>>>
>>> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms
>>>
>>> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
>>> 27.832369 ms
>>>
>>> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0
>>> (TID 1)
>>>
>>> java.lang.NullPointerException
>>>
>>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>>>
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>
>>> at
>>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>>> SQLContext.scala:500)
>>>
>>> at
>>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>>> SQLContext.scala:500)
>>>
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>>> TraversableLike.scala:244)
>>>
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>>> TraversableLike.scala:244)
>>>
>>> at scala.

Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-28 Thread Romi Kuntsman
Did you try to cache a DataFrame with just a single row?
Do you rows have any columns with null values?
Can you post a code snippet here on how you load/generate the dataframe?
Does dataframe.rdd.cache work?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu 
wrote:

> It is not a problem to use JavaRDD.cache() for 200M data (all Objects read
> form Json Format). But when I try to use DataFrame.cache(), It shown
> exception in below.
>
> My machine can cache 1 G data in Avro format without any problem.
>
> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms
>
> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
> 27.832369 ms
>
> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
> 1)
>
> java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
> SQLContext.scala:500)
>
> at
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
> SQLContext.scala:500)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(
> IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
> SQLContext.scala:500)
>
> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
> SQLContext.scala:498)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
> InMemoryColumnarTableScan.scala:127)
>
> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
> InMemoryColumnarTableScan.scala:120)
>
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278
> )
>
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
>
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38
> )
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38
> )
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15/10/29 13:26:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
> localhost): java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>
>
> Thanks,
>
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.


Re: how to get RDD from two different RDDs with cross column

2015-09-21 Thread Romi Kuntsman
Hi,
If I understand correctly:
rdd1 contains keys (of type StringDate)
rdd2 contains keys and values
and rdd3 contains all the keys, and the values from rdd2?

I think you should make rdd1 and rdd2 PairRDD, and then use outer join.
Does that make sense?

On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhu  wrote:

> Dear Romi, Priya, Sujt and Shivaram and all,
>
> I have took lots of days to think into this issue, however, without  any
> enough good solution...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, and another RDD rdd2,
> (rdd2 can be PairRDD, or DataFrame with two columns as ).
> StringDate column values from rdd1 and rdd2 are cross but not the same.
>
> I would like to get a new RDD rdd3, StringDate in rdd3
> would be all from (same) as rdd1, and float in rdd3 would be from rdd2 if
> its
> StringDate is in rdd2, or else NULL would be assigned.
> each row in rdd3[ i ] = ,
> rdd2[i].StringDate would be same as rdd1[ i ].StringDate,
> then rdd2[ i ].float is assigned rdd3[ i ] StringDate part.
> What kinds of API or function would I use...
>
> Thanks very much!
> Zhiliang
>
>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
Cody, that's a great reference!
As shown there - the best way to connect to an external database from the
workers is to create a connection pool on (each) worker.
The driver mass pass, via broadcast, the connection string, but not the
connect object itself and not the spark context.

On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger  wrote:

> That isn't accurate, I think you're confused about foreach.
>
> Look at
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
>
> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman  wrote:
>
>> foreach is something that runs on the driver, not the workers.
>>
>> if you want to perform some function on each record from cassandra, you
>> need to do cassandraRdd.map(func), which will run distributed on the spark
>> workers
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
>> wrote:
>>
>>> Yes, but i need to read from cassandra db within a spark
>>> transformation..something like..
>>>
>>> dstream.forachRDD{
>>>
>>> rdd=> rdd.foreach {
>>>  message =>
>>>  sc.cassandraTable()
>>>   .
>>>   .
>>>   .
>>> }
>>> }
>>>
>>> Since rdd.foreach gets executed on workers, how can i make sparkContext
>>> available on workers ???
>>>
>>> Regards,
>>> Padma Ch
>>>
>>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>>>
>>>> You can use broadcast variable for passing connection information.
>>>>
>>>> Cheers
>>>>
>>>> On Sep 21, 2015, at 4:27 AM, Priya Ch 
>>>> wrote:
>>>>
>>>> can i use this sparkContext on executors ??
>>>> In my application, i have scenario of reading from db for certain
>>>> records in rdd. Hence I need sparkContext to read from DB (cassandra in our
>>>> case),
>>>>
>>>> If sparkContext couldn't be sent to executors , what is the workaround
>>>> for this ??
>>>>
>>>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak 
>>>> wrote:
>>>>
>>>>> add @transient?
>>>>>
>>>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
>>>>> learnings.chitt...@gmail.com> wrote:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>> How can i pass sparkContext as a parameter to a method in an
>>>>>> object. Because passing sparkContext is giving me TaskNotSerializable
>>>>>> Exception.
>>>>>>
>>>>>> How can i achieve this ?
>>>>>>
>>>>>> Thanks,
>>>>>> Padma Ch
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
foreach is something that runs on the driver, not the workers.

if you want to perform some function on each record from cassandra, you
need to do cassandraRdd.map(func), which will run distributed on the spark
workers

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
wrote:

> Yes, but i need to read from cassandra db within a spark
> transformation..something like..
>
> dstream.forachRDD{
>
> rdd=> rdd.foreach {
>  message =>
>  sc.cassandraTable()
>   .
>   .
>   .
> }
> }
>
> Since rdd.foreach gets executed on workers, how can i make sparkContext
> available on workers ???
>
> Regards,
> Padma Ch
>
> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>
>> You can use broadcast variable for passing connection information.
>>
>> Cheers
>>
>> On Sep 21, 2015, at 4:27 AM, Priya Ch 
>> wrote:
>>
>> can i use this sparkContext on executors ??
>> In my application, i have scenario of reading from db for certain records
>> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>>
>> If sparkContext couldn't be sent to executors , what is the workaround
>> for this ??
>>
>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>>
>>> add @transient?
>>>
>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch >> > wrote:
>>>
>>>> Hello All,
>>>>
>>>> How can i pass sparkContext as a parameter to a method in an
>>>> object. Because passing sparkContext is giving me TaskNotSerializable
>>>> Exception.
>>>>
>>>> How can i achieve this ?
>>>>
>>>> Thanks,
>>>> Padma Ch
>>>>
>>>
>>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
sparkConext is available on the driver, not on executors.

To read from Cassandra, you can use something like this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 2:27 PM, Priya Ch 
wrote:

> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records
> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>
> If sparkContext couldn't be sent to executors , what is the workaround for
> this ??
>
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>
>> add @transient?
>>
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
>> wrote:
>>
>>> Hello All,
>>>
>>> How can i pass sparkContext as a parameter to a method in an object.
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>>
>>> How can i achieve this ?
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>
>>
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Romi Kuntsman
RDD is a set of data rows (in your case numbers), there is no meaning for
the order of the items.
What exactly are you trying to accomplish?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu 
wrote:

> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>


How to determine the value for spark.sql.shuffle.partitions?

2015-09-01 Thread Romi Kuntsman
Hi all,

The number of partition greatly affect the speed and efficiency of
calculation, in my case in DataFrames/SparkSQL on Spark 1.4.0.

Too few partitions with large data cause OOM exceptions.
Too many partitions on small data cause a delay due to overhead.

How do you programmatically determine the optimal number of partitions and
cores in Spark, as a function of:

   1. available memory per core
   2. number of records in input data
   3. average/maximum record size
   4. cache configuration
   5. shuffle configuration
   6. serialization
   7. etc?

Any general best practices?

Thanks!

Romi K.


Re: How to remove worker node but let it finish first?

2015-08-29 Thread Romi Kuntsman
It's only available in Mesos?
I'm using spark standalone cluster, is there anything about it there?

On Fri, Aug 28, 2015 at 8:51 AM Akhil Das 
wrote:

> You can create a custom mesos framework for your requirement, to get you
> started you can check this out
> http://mesos.apache.org/documentation/latest/app-framework-development-guide/
>
> Thanks
> Best Regards
>
> On Mon, Aug 24, 2015 at 12:11 PM, Romi Kuntsman  wrote:
>
>> Hi,
>> I have a spark standalone cluster with 100s of applications per day, and
>> it changes size (more or less workers) at various hours. The driver runs on
>> a separate machine outside the spark cluster.
>>
>> When a job is running and it's worker is killed (because at that hour the
>> number of workers is reduced), it sometimes fails, instead of
>> redistributing the work to other workers.
>>
>> How is it possible to decomission a worker, so that it doesn't receive
>> any new work, but does finish all existing work before shutting down?
>>
>> Thanks!
>>
>
>


Re: Exception when S3 path contains colons

2015-08-25 Thread Romi Kuntsman
Hello,

We had the same problem. I've written a blog post with the detailed
explanation and workaround:

http://labs.totango.com/spark-read-file-with-colon/

Greetings,
Romi K.

On Tue, Aug 25, 2015 at 2:47 PM Gourav Sengupta 
wrote:

> I am not quite sure about this but should the notation not be 
> s3n://redactedbucketname/*
> instead of
> s3a://redactedbucketname/*
>
> The best way is to use s3://<>/<>/*
>
>
> Regards,
> Gourav
>
> On Tue, Aug 25, 2015 at 10:35 AM, Akhil Das 
> wrote:
>
>> You can change the names, whatever program that is pushing the record
>> must follow the naming conventions. Try to replace : with _ or something.
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Aug 18, 2015 at 10:20 AM, Brian Stempin 
>> wrote:
>>
>>> Hi,
>>> I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0).  I'm seeing
>>> the exception below when encountering file names that contain colons.  Any
>>> idea on how to get around this?
>>>
>>> scala> val files = sc.textFile("s3a://redactedbucketname/*")
>>>
>>> 2015-08-18 04:38:34,567 INFO  [main] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with
>>> curMem=669367, maxMem=285203496
>>>
>>> 2015-08-18 04:38:34,568 INFO  [main] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory
>>> (estimated size 236.5 KB, free 271.1 MB)
>>>
>>> 2015-08-18 04:38:34,663 INFO  [main] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with
>>> curMem=911591, maxMem=285203496
>>>
>>> 2015-08-18 04:38:34,664 INFO  [main] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in
>>> memory (estimated size 21.0 KB, free 271.1 MB)
>>>
>>> 2015-08-18 04:38:34,665 INFO
>>>  [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo
>>> (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on
>>> 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB)
>>>
>>> 2015-08-18 04:38:34,667 INFO  [main] spark.SparkContext
>>> (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at
>>> :21
>>>
>>> files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at
>>> textFile at :21
>>>
>>>
>>> scala> files.count
>>>
>>> 2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
>>> (S3AFileSystem.java:listStatus(533)) - List status for path:
>>> s3a://redactedbucketname/
>>>
>>> 2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
>>> (S3AFileSystem.java:getFileStatus(684)) - Getting path status for
>>> s3a://redactedbucketname/ ()
>>>
>>> java.lang.IllegalArgumentException: java.net.URISyntaxException:
>>> Relative path in absolute URI:
>>> [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv
>>>
>>> at org.apache.hadoop.fs.Path.initialize(Path.java:206)
>>>
>>> at org.apache.hadoop.fs.Path.(Path.java:172)
>>>
>>> at org.apache.hadoop.fs.Path.(Path.java:94)
>>>
>>> at org.apache.hadoop.fs.Globber.glob(Globber.java:240)
>>>
>>> at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700)
>>>
>>> at
>>> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229)
>>>
>>> at
>>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200)
>>>
>>> at
>>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279)
>>>
>>> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
>>>
>>> at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
>>>
>>> at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
>>>
>>> at scala.Option.getOrElse(Option.scala:120)
>>>
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>>
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>>>
>>> at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
>>>
>>> at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
>>>
>>> at scala.Option.getOrElse(Option.scala:120)
>>>
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>>
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
>>>
>>> at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
>>>
>>> at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.(:24)
>>>
>>> at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.(:29)
>>>
>>> at $iwC$iwC$iwC$iwC$iwC$iwC.(:31)
>>>
>>> at $iwC$iwC$iwC$iwC$iwC.(:33)
>>>
>>> at $iwC$iwC$iwC$iwC.(:35)
>>>
>>> at $iwC$iwC$iwC.(:37)
>>>
>>> at $iwC$iwC.(:39)
>>>
>>> at $iwC.(:41)
>>>
>>> at (:43)
>>>
>>> at .(:47)
>>>
>>> at .()
>>>
>>> at .(:7)
>>>
>>> at .()
>>>
>>> at $print()
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>
>>> at
>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(S

How to remove worker node but let it finish first?

2015-08-23 Thread Romi Kuntsman
Hi,
I have a spark standalone cluster with 100s of applications per day, and it
changes size (more or less workers) at various hours. The driver runs on a
separate machine outside the spark cluster.

When a job is running and it's worker is killed (because at that hour the
number of workers is reduced), it sometimes fails, instead of
redistributing the work to other workers.

How is it possible to decomission a worker, so that it doesn't receive any
new work, but does finish all existing work before shutting down?

Thanks!


Re: How to overwrite partition when writing Parquet?

2015-08-20 Thread Romi Kuntsman
Cheng - what if I want to overwrite a specific partition?

I'll to remove the folder, as Hemant suggested...

On Thu, Aug 20, 2015 at 1:17 PM Cheng Lian  wrote:

> You can apply a filter first to filter out data of needed dates and then
> append them.
>
>
> Cheng
>
>
> On 8/20/15 4:59 PM, Hemant Bhanawat wrote:
>
> How can I overwrite only a given partition or manually remove a partition
> before writing?
>
> I don't know if (and I don't think)  there is a way to do that using a
> mode. But doesn't manually deleting the directory of a particular partition
> help? For directory structure, check this out...
>
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery
>
>
> On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman  wrote:
>
>> Hello,
>>
>> I have a DataFrame, with a date column which I want to use as a partition.
>> Each day I want to write the data for the same date in Parquet, and then
>> read a dataframe for a date range.
>>
>> I'm using:
>>
>> myDataframe.write().partitionBy("date").mode(SaveMode.Overwrite).parquet(parquetDir);
>>
>> If I use SaveMode.Append, then writing data for the same partition adds
>> the same data there again.
>> If I use SaveMode.Overwrite, then writing data for a single partition
>> removes all the data for all partitions.
>>
>> How can I overwrite only a given partition or manually remove a partition
>> before writing?
>>
>> Many thanks!
>> Romi K.
>>
>
>
>


Re: How to minimize shuffling on Spark dataframe Join?

2015-08-19 Thread Romi Kuntsman
If you create a PairRDD from the DataFrame, using
dataFrame.toRDD().mapToPair(), then you can call
partitionBy(someCustomPartitioner) which will partition the RDD by the key
(of the pair).
Then the operations on it (like joining with another RDD) will consider
this partitioning.
I'm not sure that DataFrames already support this.

On Wed, Aug 12, 2015 at 11:16 AM Abdullah Anwar <
abdullah.ibn.an...@gmail.com> wrote:

> Hi Hemant,
>
> Thank you for your replay.
>
> I think source of my dataframe is not partitioned on key, its an avro
> file where 'id' is a field .. but I don't know how to read a file and at
> the same time configure partition key. I couldn't find  anything on
> SQLContext.read.load where you can set partition key. or in dataframe where
> you can set partition key. If it could partition the on the specified key
> .. will spark put the same partition range on same machine for two
> different dataframe??
>
>What are the overall tips to join faster?
>
> Best Regards,
> Abdullah
>
>
>
>
> On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat 
> wrote:
>
>> Is the source of your dataframe partitioned on key? As per your mail, it
>> looks like it is not. If that is the case,  for partitioning the data, you
>> will have to shuffle the data anyway.
>>
>> Another part of your question is - how to co-group data from two
>> dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is
>> a way. I am not sure if something similar is available for DataFrames.
>>
>> Hemant
>>
>>
>>
>>
>>
>> On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar <
>> abdullah.ibn.an...@gmail.com> wrote:
>>
>>>
>>>
>>> I have two dataframes like this
>>>
>>>   student_rdf = (studentid, name, ...)
>>>   student_result_rdf = (studentid, gpa, ...)
>>>
>>> we need to join this two dataframes. we are now doing like this,
>>>
>>> student_rdf.join(student_result_rdf, student_result_rdf["studentid"] == 
>>> student_rdf["studentid"])
>>>
>>> So it is simple. But it creates lots of data shuffling across worker
>>> nodes, but as joining key is similar and if the dataframe could (understand
>>> the partitionkey) be partitioned using that key (studentid) then there
>>> suppose not to be any shuffling at all. As similar data (based on partition
>>> key) would reside in similar node. is it possible, to hint spark to do this?
>>>
>>> So, I am finding the way to partition data based on a column while I
>>> read a dataframe from input. And If it is possible that Spark would
>>> understand that two partitionkey of two dataframes are similar, then how?
>>>
>>>
>>>
>>>
>>> --
>>> Abdullah
>>>
>>
>>
>
>
> --
> Abdullah
>


Re: Issues with S3 paths that contain colons

2015-08-19 Thread Romi Kuntsman
I had the exact same issue, and overcame it by overriding
NativeS3FileSystem with my own class, where I replaced the implementation
of globStatus. It's a hack but it works.
Then I set the hadoop config fs.myschema.impl to my class name, and
accessed the files through myschema:// instead of s3n://

@Override
public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter)
throws IOException {
  final FileStatus[] statusList = super.listStatus(pathPattern);
  final List result = Lists.newLinkedList();
  for (FileStatus fileStatus : statusList) {
if (filter.accept(fileStatus.getPath())) {
  result.add(fileStatus);
}
  }
  return result.toArray(new FileStatus[] {});
}



On Wed, Aug 19, 2015 at 9:14 PM Steve Loughran 
wrote:

> you might want to think about filing a JIRA on issues.apache.org against
> HADOOP here, component being fs/s3. That doesn't mean it is fixable, only
> known.
>
> Every FS has its own set of forbidden characters & filenames; unix doesn't
> files named "."; windows doesn't allow files called COM1, ..., so hitting
> some filesystem rule is sometimes a problem. Here, though, you've got the
> file in S3, the listing finds it, but other bits of the codepath are
> failing -which implies that it is something in the Hadoop libs.
>
>
> > On 18 Aug 2015, at 08:20, Brian Stempin  wrote:
> >
> > Hi,
> > I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0).  I'm seeing
> the
> > exception below when encountering file names that contain colons.  Any
> idea
> > on how to get around this?
> >
> > scala> val files = sc.textFile("s3a://redactedbucketname/*")
> >
> > 2015-08-18 04:38:34,567 INFO  [main] storage.MemoryStore
> > (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with
> > curMem=669367, maxMem=285203496
> >
> > 2015-08-18 04:38:34,568 INFO  [main] storage.MemoryStore
> > (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in
> memory
> > (estimated size 236.5 KB, free 271.1 MB)
> >
> > 2015-08-18 04:38:34,663 INFO  [main] storage.MemoryStore
> > (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with
> > curMem=911591, maxMem=285203496
> >
> > 2015-08-18 04:38:34,664 INFO  [main] storage.MemoryStore
> > (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in
> > memory (estimated size 21.0 KB, free 271.1 MB)
> >
> > 2015-08-18 04:38:34,665 INFO
> [sparkDriver-akka.actor.default-dispatcher-19]
> > storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added
> > broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB,
> free:
> > 271.9 MB)
> >
> > 2015-08-18 04:38:34,667 INFO  [main] spark.SparkContext
> > (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at
> > :21
> >
> > files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at
> textFile at
> > :21
> >
> >
> > scala> files.count
> >
> > 2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
> > (S3AFileSystem.java:listStatus(533)) - List status for path:
> > s3a://redactedbucketname/
> >
> > 2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
> > (S3AFileSystem.java:getFileStatus(684)) - Getting path status for
> > s3a://redactedbucketname/ ()
> >
> > java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative
> > path in absolute URI:
> >
> [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv
> >
> > at org.apache.hadoop.fs.Path.initialize(Path.java:206)
> >
> > at org.apache.hadoop.fs.Path.(Path.java:172)
> >
> > at org.apache.hadoop.fs.Path.(Path.java:94)
> >
> > at org.apache.hadoop.fs.Globber.glob(Globber.java:240)
> >
> > at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700)
> >
> > at
> >
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229)
> >
> > at
> >
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200)
> >
> > at
> >
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279)
> >
> > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
> >
> > at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
> >
> > at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
> >
> > at scala.Option.getOrElse(Option.scala:120)
> >
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> >
> > at
> >
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> >
> > at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
> >
> > at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
> >
> > at scala.Option.getOrElse(Option.scala:120)
> >
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> >
> > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
> >
> > at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
> >
> > at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.(:24)
> >
> > at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.(:29)
> >
> > at $iwC$iwC$iwC$iwC$iwC$iwC.(:31)
> >
> > at $iwC

How to overwrite partition when writing Parquet?

2015-08-19 Thread Romi Kuntsman
Hello,

I have a DataFrame, with a date column which I want to use as a partition.
Each day I want to write the data for the same date in Parquet, and then
read a dataframe for a date range.

I'm using:
myDataframe.write().partitionBy("date").mode(SaveMode.Overwrite).parquet(parquetDir);

If I use SaveMode.Append, then writing data for the same partition adds the
same data there again.
If I use SaveMode.Overwrite, then writing data for a single partition
removes all the data for all partitions.

How can I overwrite only a given partition or manually remove a partition
before writing?

Many thanks!
Romi K.


Re: spark as a lookup engine for dedup

2015-07-27 Thread Romi Kuntsman
RDD is immutable, it cannot be changed, you can only create a new one from
data or from transformation. It sounds inefficient to create one each 15
sec for the last 24 hours.
I think a key-value store will be much more fitted for this purpose.

On Mon, Jul 27, 2015 at 11:21 AM Shushant Arora 
wrote:

> its for 1 day events in range of 1 billions and processing is in streaming
> application of ~10-15 sec interval so lookup should be fast.  RDD need to
> be updated with new events and old events of current time-24 hours back
> should be removed at each processing.
>
> So is spark RDD not fit for this requirement?
>
> On Mon, Jul 27, 2015 at 1:08 PM, Romi Kuntsman  wrote:
>
>> What the throughput of processing and for how long do you need to
>> remember duplicates?
>>
>> You can take all the events, put them in an RDD, group by the key, and
>> then process each key only once.
>> But if you have a long running application where you want to check that
>> you didn't see the same value before, and check that for every value, you
>> probably need a key-value store, not RDD.
>>
>> On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora 
>> wrote:
>>
>>> Hi
>>>
>>> I have a requirement for processing large events but ignoring duplicate
>>> at the same time.
>>>
>>> Events are consumed from kafka and each event has a eventid. It may
>>> happen that an event is already processed and came again at some other
>>> offset.
>>>
>>> 1.Can I use Spark RDD to persist processed events and then lookup with
>>> this rdd (How to do lookup inside a RDD ?I have a
>>> JavaPairRDD )
>>> while processing new events and if event is present in  persisted rdd
>>> ignore it , else process the even. Does rdd.lookup(key) on billion of
>>> events will be efficient ?
>>>
>>> 2. update the rdd (Since RDD is immutable  how to update it)?
>>>
>>> Thanks
>>>
>>>
>


Re: spark as a lookup engine for dedup

2015-07-27 Thread Romi Kuntsman
What the throughput of processing and for how long do you need to remember
duplicates?

You can take all the events, put them in an RDD, group by the key, and then
process each key only once.
But if you have a long running application where you want to check that you
didn't see the same value before, and check that for every value, you
probably need a key-value store, not RDD.

On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora 
wrote:

> Hi
>
> I have a requirement for processing large events but ignoring duplicate at
> the same time.
>
> Events are consumed from kafka and each event has a eventid. It may happen
> that an event is already processed and came again at some other offset.
>
> 1.Can I use Spark RDD to persist processed events and then lookup with
> this rdd (How to do lookup inside a RDD ?I have a
> JavaPairRDD )
> while processing new events and if event is present in  persisted rdd
> ignore it , else process the even. Does rdd.lookup(key) on billion of
> events will be efficient ?
>
> 2. update the rdd (Since RDD is immutable  how to update it)?
>
> Thanks
>
>


Re: Scaling spark cluster for a running application

2015-07-22 Thread Romi Kuntsman
Are you running the Spark cluster in standalone or YARN?
In standalone, the application gets the available resources when it starts.
With YARN, you can try to turn on the setting
*spark.dynamicAllocation.enabled*
See https://spark.apache.org/docs/latest/configuration.html

On Wed, Jul 22, 2015 at 2:20 PM phagunbaya  wrote:

> I have a spark cluster running in client mode with driver outside the spark
> cluster. I want to scale the cluster after an application is submitted. In
> order to do this, I'm creating new workers and they are getting registered
> with master but issue I'm seeing is; running application does not use the
> newly added worker. Hence cannot add more resources to existing running
> application.
>
> Is there any other way or config to deal with this use-case ? How to make
> running application to ask for executors from newly issued worker node ?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Applications metrics unseparatable from Master metrics?

2015-07-22 Thread Romi Kuntsman
Hi,

I tried to enable Master metrics source (to get number of running/waiting
applications etc), and connected it to Graphite.
However, when these are enabled, application metrics are also sent.

Is it possible to separate them, and send only master metrics without
applications?

I see that Master class is registering both:
https://github.com/apache/spark/blob/b9a922e260bec1b211437f020be37fab46a85db0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L91

Thanks,
RK.


Re: Timestamp functions for sqlContext

2015-07-21 Thread Romi Kuntsman
Hi Tal,

I'm not sure there is currently a built-in function for it, but you can
easily define a UDF (user defined function) by extending
org.apache.spark.sql.api.java.UDF1, registering it
(sparkContext.udf().register(...)), and then use it inside your query.

RK.



On Tue, Jul 21, 2015 at 7:04 PM Tal Rozen  wrote:

> Hi,
>
> I'm running a query with sql context where one of the fields is of type
> java.sql.Timestamp. I'd like to set a function similar to DATEDIFF in
> mysql, between the date given in each row, and now. So If I was able to use
> the same syntax as in mysql it would be:
>
> val date_diff_df = sqlContext.sql("select DATEDIFF(curdate(),
> rowTimestamp) date_diff from tableName")
>
> What are the relevant key words to replace curdate(), and DATEDIFF?
>
> Thanks
>
>
>
>
>
>


Spark Application stuck retrying task failed on Java heap space?

2015-07-21 Thread Romi Kuntsman
Hello,

*TL;DR: task crashes with OOM, but application gets stuck in infinite loop
retrying the task over and over again instead of failing fast.*

Using Spark 1.4.0, standalone, with DataFrames on Java 7.
I have an application that does some aggregations. I played around with
shuffling settings, which led to the dreaded Java heap space error. See the
stack trace at the end of this message.

When this happens, I see 10's of executors in "EXITED" state, a couple in
"LOADING" and one in "RUNNING". All of them are retrying the same task all
over again, and keep failing with the same "Java heap space" error. This
goes on for hours!

Why doesn't the whole application fail, when the individual executors keep
failing with the same error?

Thanks,
Romi K.

---

end of the log in a failed task:

15/07/21 11:13:40 INFO executor.Executor: Finished task 117.0 in stage
218.1 (TID 305). 2000 bytes result sent to driver
15/07/21 11:13:41 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 306
15/07/21 11:13:41 INFO executor.Executor: Running task 0.0 in stage 219.1
(TID 306)
15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Updating epoch to 420
and clearing cache
15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Started reading
broadcast variable 8
15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(5463) called
with curMem=285917, maxMem=1406164008
15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8_piece0 stored
as bytes in memory (estimated size 5.3 KB, free 1340.7 MB)
15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 8 took 22 ms
15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(10880) called
with curMem=291380, maxMem=1406164008
15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8 stored as
values in memory (estimated size 10.6 KB, free 1340.7 MB)
15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Don't have map outputs
for shuffle 136, fetching them
15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Doing the fetch;
tracker endpoint = AkkaRpcEndpointRef(Actor[akka.tcp://
sparkDriver@1.2.3.4:57490/user/MapOutputTracker#-99712578])
15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Got the output
locations
15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Getting 182
non-empty blocks out of 182 blocks
15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Started 4
remote fetches in 28 ms
15/07/21 11:14:34 ERROR executor.Executor: Exception in task 0.0 in stage
219.1 (TID 306)
java.lang.OutOfMemoryError: Java heap space
at
scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99)
at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47)
at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83)
at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47)
at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:192)
at
org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:190)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
   

Re: Guava 11 dependency issue in Spark 1.2.0

2015-01-19 Thread Romi Kuntsman
Actually there is already someone on Hadoop-Common-Dev taking care of
removing the old Guava dependency

http://mail-archives.apache.org/mod_mbox/hadoop-common-dev/201501.mbox/browser
https://issues.apache.org/jira/browse/HADOOP-11470

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Mon, Jan 19, 2015 at 4:03 PM, Romi Kuntsman  wrote:

> I have recently encountered a similar problem with Guava version collision
> with Hadoop.
>
> Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are
> they staying in version 11, does anyone know?
>
> *Romi Kuntsman*, *Big Data Engineer*
>  http://www.totango.com
>
> On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera 
> wrote:
>
>> Hi Sean,
>>
>> I removed the hadoop dependencies from the app and ran it on the cluster.
>> It gives a java.io.EOFException
>>
>> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with
>> curMem=0, maxMem=2004174766
>> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in
>> memory (estimated size 173.0 KB, free 1911.2 MB)
>> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with
>> curMem=177166, maxMem=2004174766
>> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as
>> bytes in memory (estimated size 24.9 KB, free 1911.1 MB)
>> 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>> memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB)
>> 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block
>> broadcast_0_piece0
>> 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile
>> at AvroRelation.scala:45
>> 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1
>> 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at
>> SparkPlan.scala:84
>> 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at
>> SparkPlan.scala:84) with 2 output partitions (allowLocal=false)
>> 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at
>> SparkPlan.scala:84)
>> 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List()
>> 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List()
>> 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at
>> map at SparkPlan.scala:84), which has no missing parents
>> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with
>> curMem=202668, maxMem=2004174766
>> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in
>> memory (estimated size 4.8 KB, free 1911.1 MB)
>> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with
>> curMem=207532, maxMem=2004174766
>> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as
>> bytes in memory (estimated size 3.4 KB, free 1911.1 MB)
>> 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>> memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB)
>> 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block
>> broadcast_1_piece0
>> 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast
>> at DAGScheduler.scala:838
>> 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from
>> Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84)
>> 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
>> 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0
>> (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes)
>> 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0
>> (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes)
>> 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
>> 10.100.5.109): java.io.EOFException
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722)
>> at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009)
>> at
>> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
>> at
>> org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
>> at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
>> at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
>> at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
>> at
>> org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
>> at
>> org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
>> at
>> org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
>> at org.apache.s

Re: Guava 11 dependency issue in Spark 1.2.0

2015-01-19 Thread Romi Kuntsman
I have recently encountered a similar problem with Guava version collision
with Hadoop.

Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are
they staying in version 11, does anyone know?

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera 
wrote:

> Hi Sean,
>
> I removed the hadoop dependencies from the app and ran it on the cluster.
> It gives a java.io.EOFException
>
> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with
> curMem=0, maxMem=2004174766
> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 173.0 KB, free 1911.2 MB)
> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with
> curMem=177166, maxMem=2004174766
> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as
> bytes in memory (estimated size 24.9 KB, free 1911.1 MB)
> 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB)
> 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block
> broadcast_0_piece0
> 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile
> at AvroRelation.scala:45
> 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1
> 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at
> SparkPlan.scala:84
> 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at
> SparkPlan.scala:84) with 2 output partitions (allowLocal=false)
> 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at
> SparkPlan.scala:84)
> 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List()
> 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List()
> 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at
> map at SparkPlan.scala:84), which has no missing parents
> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with
> curMem=202668, maxMem=2004174766
> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 4.8 KB, free 1911.1 MB)
> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with
> curMem=207532, maxMem=2004174766
> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as
> bytes in memory (estimated size 3.4 KB, free 1911.1 MB)
> 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB)
> 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block
> broadcast_1_piece0
> 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
> 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from Stage
> 0 (MappedRDD[6] at map at SparkPlan.scala:84)
> 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
> 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes)
> 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes)
> 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
> 10.100.5.109): java.io.EOFException
> at
> java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722)
> at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009)
> at
> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
> at
> org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
> at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
> at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
> at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
> at
> org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
> at
> org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
> at
> org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
> at
> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
&g

Re: Announcing Spark 1.1.1!

2014-12-03 Thread Romi Kuntsman
About version compatibility and upgrade path -  can the Java application
dependencies and the Spark server be upgraded separately (i.e. will 1.1.0
library work with 1.1.1 server, and vice versa), or do they need to be
upgraded together?

Thanks!

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or  wrote:

> I am happy to announce the availability of Spark 1.1.1! This is a
> maintenance release with many bug fixes, most of which are concentrated in
> the core. This list includes various fixes to sort-based shuffle, memory
> leak, and spilling issues. Contributions from this release came from 55
> developers.
>
> Visit the release notes [1] to read about the new features, or
> download [2] the release today.
>
> [1] http://spark.apache.org/releases/spark-release-1-1-1.html
> [2] http://spark.apache.org/downloads.html
>
> Please e-mail me directly for any typo's in the release notes or name
> listing.
>
> Thanks for everyone who contributed, and congratulations!
> -Andrew
>


ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly

2014-11-26 Thread Romi Kuntsman
Hello,

I have a large data calculation in Spark, distributed across serveral
nodes. In the end, I want to write to a single output file.

For this I do:
   output.coalesce(1, false).saveAsTextFile(filename).

What happens is all the data from the workers flows to a single worker, and
that one writes the data.
If the data is small enough, it all goes well.
However, for a RDD from a certain size, I get a lot of the following
messages (see below).

>From what I understand, ExternalAppendOnlyMap spills the data to disk when
it can't hold it in memory.
Is there a way to tell it to stream the data right to disk, instead of
spilling each block slowly?

14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 22 ms
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 70 non-empty blocks out of 90 blocks
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 4 ms
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (1 time so far)
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (2 times so far)
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 2 ms
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 15 MB to disk (1 time so far)
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 16 MB to disk (2 times so far)
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 14 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (33 times so far)
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (34 times so far)
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (35 times so far)
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 4 ms
14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 10 MB to disk (1 time so far)
14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 10 MB to disk (2 times so far)
14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 9 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (36 times so far)
14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 11 MB to disk (37 times so far)
14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task
'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com


ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly

2014-11-24 Thread Romi Kuntsman
Hello,

I have a large data calculation in Spark, distributed across serveral
nodes. In the end, I want to write to a single output file.

For this I do:
   output.coalesce(1, false).saveAsTextFile(filename).

What happens is all the data from the workers flows to a single worker, and
that one writes the data.
If the data is small enough, it all goes well.
However, for a RDD from a certain size, I get a lot of the following
messages (see below).

>From what I understand, ExternalAppendOnlyMap spills the data to disk when
it can't hold it in memory.
Is there a way to tell it to stream the data right to disk, instead of
spilling each block slowly?

14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 22 ms
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 70 non-empty blocks out of 90 blocks
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 4 ms
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (1 time so far)
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (2 times so far)
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 2 ms
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 15 MB to disk (1 time so far)
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 16 MB to disk (2 times so far)
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 14 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (33 times so far)
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (34 times so far)
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (35 times so far)
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 4 ms
14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 10 MB to disk (1 time so far)
14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 10 MB to disk (2 times so far)
14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 9 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (36 times so far)
14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 11 MB to disk (37 times so far)
14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task
'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com


Task time measurement

2014-11-11 Thread Romi Kuntsman
Hello,

Currently in Spark standalone console, I can only see how long the entire
job took.
How can I know how long it was in WAITING and how long in RUNNING, and also
when running, how much each of the jobs inside took?

Thanks,

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com


Snappy temp files not cleaned up

2014-11-05 Thread Romi Kuntsman
Hello,

Spark has an internal cleanup mechanism
(defined by spark.cleaner.ttl, see
http://spark.apache.org/docs/latest/configuration.html)
which cleans up tasks and stages.

However, in our installation, we noticed that Snappy temporary files and
never cleaned up.

Is it a misconfiguration? Missing feature? How do you deal with build-up of
temp files?

Thanks,

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com


Re: Spark job resource allocation best practices

2014-11-04 Thread Romi Kuntsman
Let's say that I run Spark on Mesos in fine-grained mode, and I have 12
cores and 64GB memory.
I run application A on Spark, and some time after that (but before A
finished) application B.
How many CPUs will each of them get?

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Tue, Nov 4, 2014 at 11:33 AM, Akhil Das 
wrote:

> You need to install mesos on your cluster. Then you will run your spark
> applications by specifying mesos master (mesos://) instead of (spark://).
>
> Spark can run over Mesos in two modes: “*fine-grained*” (default) and “
> *coarse-grained*”.
>
> In “*fine-grained*” mode (default), each Spark task runs as a separate
> Mesos task. This allows multiple instances of Spark (and other frameworks)
> to share machines at a very fine granularity, where each application gets
> more or fewer machines as it ramps up and down, but it comes with an
> additional overhead in launching each task. This mode may be inappropriate
> for low-latency requirements like interactive queries or serving web
> requests.
>
> The “*coarse-grained*” mode will instead launch only one long-running
> Spark task on each Mesos machine, and dynamically schedule its own
> “mini-tasks” within it. The benefit is much lower startup overhead, but at
> the cost of reserving the Mesos resources for the complete duration of the
> application.
>
> To run in coarse-grained mode, set the spark.mesos.coarse property in your
> SparkConf:
>  conf.set("spark.mesos.coarse", "true")
>
>
> In addition, for coarse-grained mode, you can control the maximum number
> of resources Spark will acquire. By default, it will acquire all cores in
> the cluster (that get offered by Mesos), which only makes sense if you run
> just one application at a time. You can cap the maximum number of cores
> using conf.set("spark.cores.max", "10") (for example).
>
>
> If you run your application in fine-grained mode, then mesos will take
> care of the resource allocation for you. You just tell mesos from your
> application that you are running in fine-grained mode, and it is the
> default mode.
>
> Thanks
> Best Regards
>
> On Tue, Nov 4, 2014 at 2:46 PM, Romi Kuntsman  wrote:
>
>> I have a single Spark cluster, not multiple frameworks and not multiple
>> versions. Is it relevant for my use-case?
>> Where can I find information about exactly how to make Mesos tell Spark
>> how many resources of the cluster to use? (instead of the default take-all)
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>>  http://www.totango.com
>>
>> On Tue, Nov 4, 2014 at 11:00 AM, Akhil Das 
>> wrote:
>>
>>> You can look at different modes over here
>>> http://docs.sigmoidanalytics.com/index.php/Spark_On_Mesos#Mesos_Run_Modes
>>>
>>> These people has very good tutorial to get you started
>>> http://mesosphere.com/docs/tutorials/run-spark-on-mesos/#overview
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Nov 4, 2014 at 1:44 PM, Romi Kuntsman  wrote:
>>>
>>>> How can I configure Mesos allocation policy to share resources between
>>>> all current Spark applications? I can't seem to find it in the architecture
>>>> docs.
>>>>
>>>> *Romi Kuntsman*, *Big Data Engineer*
>>>>  http://www.totango.com
>>>>
>>>> On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das 
>>>> wrote:
>>>>
>>>>> Yes. i believe Mesos is the right choice for you.
>>>>> http://mesos.apache.org/documentation/latest/mesos-architecture/
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman 
>>>>> wrote:
>>>>>
>>>>>> So, as said there, static partitioning is used in "Spark’s standalone
>>>>>> and YARN modes, as well as the coarse-grained Mesos mode".
>>>>>> That leaves us only with Mesos, where there is *dynamic sharing* of
>>>>>> CPU cores.
>>>>>>
>>>>>> It says "when the application is not running tasks on a machine,
>>>>>> other applications may run tasks on those cores".
>>>>>> But my applications are short lived (seconds to minutes), and they
>>>>>> read a large dataset, process it, and write the results. They are also
>>>>>> IO-bound, meaning most of the time is spent reading input data (from S3)
>>>>>> and writing the results back.
>>>>>>
>>>>

Re: Spark job resource allocation best practices

2014-11-04 Thread Romi Kuntsman
I have a single Spark cluster, not multiple frameworks and not multiple
versions. Is it relevant for my use-case?
Where can I find information about exactly how to make Mesos tell Spark how
many resources of the cluster to use? (instead of the default take-all)

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Tue, Nov 4, 2014 at 11:00 AM, Akhil Das 
wrote:

> You can look at different modes over here
> http://docs.sigmoidanalytics.com/index.php/Spark_On_Mesos#Mesos_Run_Modes
>
> These people has very good tutorial to get you started
> http://mesosphere.com/docs/tutorials/run-spark-on-mesos/#overview
>
> Thanks
> Best Regards
>
> On Tue, Nov 4, 2014 at 1:44 PM, Romi Kuntsman  wrote:
>
>> How can I configure Mesos allocation policy to share resources between
>> all current Spark applications? I can't seem to find it in the architecture
>> docs.
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>>  http://www.totango.com
>>
>> On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das 
>> wrote:
>>
>>> Yes. i believe Mesos is the right choice for you.
>>> http://mesos.apache.org/documentation/latest/mesos-architecture/
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman  wrote:
>>>
>>>> So, as said there, static partitioning is used in "Spark’s standalone
>>>> and YARN modes, as well as the coarse-grained Mesos mode".
>>>> That leaves us only with Mesos, where there is *dynamic sharing* of
>>>> CPU cores.
>>>>
>>>> It says "when the application is not running tasks on a machine, other
>>>> applications may run tasks on those cores".
>>>> But my applications are short lived (seconds to minutes), and they read
>>>> a large dataset, process it, and write the results. They are also IO-bound,
>>>> meaning most of the time is spent reading input data (from S3) and writing
>>>> the results back.
>>>>
>>>> Is it possible to divide the resources between them, according to how
>>>> many are trying to run at the same time?
>>>> So for example if I have 12 cores - if one job is scheduled, it will
>>>> get 12 cores, but if 3 are scheduled, then each one will get 4 cores and
>>>> then will all start.
>>>>
>>>> Thanks!
>>>>
>>>> *Romi Kuntsman*, *Big Data Engineer*
>>>>  http://www.totango.com
>>>>
>>>> On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das 
>>>> wrote:
>>>>
>>>>> Have a look at scheduling pools
>>>>> <https://spark.apache.org/docs/latest/job-scheduling.html>. If you
>>>>> want more sophisticated resource allocation, then you are better of to use
>>>>> cluster managers like mesos or yarn
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman 
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a Spark 1.1.0 standalone cluster, with several nodes, and
>>>>>> several jobs (applications) being scheduled at the same time.
>>>>>> By default, each Spark job takes up all available CPUs.
>>>>>> This way, when more than one job is scheduled, all but the first are
>>>>>> stuck in "WAITING".
>>>>>> On the other hand, if I tell each job to initially limit itself to a
>>>>>> fixed number of CPUs, and that job runs by itself, the cluster is
>>>>>> under-utilized and the job runs longer than it could have if it took all
>>>>>> the available resources.
>>>>>>
>>>>>> - How to give the tasks a more fair resource division, which lets
>>>>>> many jobs run together, and together lets them use all the available
>>>>>> resources?
>>>>>> - How do you divide resources between applications on your usecase?
>>>>>>
>>>>>> P.S. I started reading about Mesos but couldn't figure out if/how it
>>>>>> could solve the described issue.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> *Romi Kuntsman*, *Big Data Engineer*
>>>>>>  http://www.totango.com
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark job resource allocation best practices

2014-11-04 Thread Romi Kuntsman
How can I configure Mesos allocation policy to share resources between all
current Spark applications? I can't seem to find it in the architecture
docs.

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das 
wrote:

> Yes. i believe Mesos is the right choice for you.
> http://mesos.apache.org/documentation/latest/mesos-architecture/
>
> Thanks
> Best Regards
>
> On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman  wrote:
>
>> So, as said there, static partitioning is used in "Spark’s standalone and
>> YARN modes, as well as the coarse-grained Mesos mode".
>> That leaves us only with Mesos, where there is *dynamic sharing* of CPU
>> cores.
>>
>> It says "when the application is not running tasks on a machine, other
>> applications may run tasks on those cores".
>> But my applications are short lived (seconds to minutes), and they read a
>> large dataset, process it, and write the results. They are also IO-bound,
>> meaning most of the time is spent reading input data (from S3) and writing
>> the results back.
>>
>> Is it possible to divide the resources between them, according to how
>> many are trying to run at the same time?
>> So for example if I have 12 cores - if one job is scheduled, it will get
>> 12 cores, but if 3 are scheduled, then each one will get 4 cores and then
>> will all start.
>>
>> Thanks!
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>>  http://www.totango.com
>>
>> On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das 
>> wrote:
>>
>>> Have a look at scheduling pools
>>> <https://spark.apache.org/docs/latest/job-scheduling.html>. If you want
>>> more sophisticated resource allocation, then you are better of to use
>>> cluster managers like mesos or yarn
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman  wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a Spark 1.1.0 standalone cluster, with several nodes, and
>>>> several jobs (applications) being scheduled at the same time.
>>>> By default, each Spark job takes up all available CPUs.
>>>> This way, when more than one job is scheduled, all but the first are
>>>> stuck in "WAITING".
>>>> On the other hand, if I tell each job to initially limit itself to a
>>>> fixed number of CPUs, and that job runs by itself, the cluster is
>>>> under-utilized and the job runs longer than it could have if it took all
>>>> the available resources.
>>>>
>>>> - How to give the tasks a more fair resource division, which lets many
>>>> jobs run together, and together lets them use all the available resources?
>>>> - How do you divide resources between applications on your usecase?
>>>>
>>>> P.S. I started reading about Mesos but couldn't figure out if/how it
>>>> could solve the described issue.
>>>>
>>>> Thanks!
>>>>
>>>> *Romi Kuntsman*, *Big Data Engineer*
>>>>  http://www.totango.com
>>>>
>>>
>>>
>>
>


Re: Dynamically switching Nr of allocated core

2014-11-03 Thread Romi Kuntsman
I didn't notice your message and asked about the same question, in the
thread with the title "Spark job resource allocation best practices".

Adding specific case to your example:
1 - There are 12 cores available in the cluster
2 - I start app B with all cores - gets 12
3 - I start app A - it needs just 2 cores (as you said it will get even
when there are 12 available), but gets nothing
4 - Until I stop app B, app A is stuck waiting, instead of app B freeing 2
cores and dropping to 10 cores.

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Mon, Nov 3, 2014 at 3:17 PM, RodrigoB  wrote:

> Hi all,
>
> I can't seem to find a clear answer on the documentation.
>
> Does the standalone cluster support dynamic assigment of nr of allocated
> cores to an application once another app stops?
> I'm aware that we can have core sharding if we use Mesos between active
> applications depending on the nr of parallel tasks I believe my question is
> slightly simpler.
>
> For example:
> 1 - There are 12 cores available in the cluster
> 2 - I start app A with 2 cores - gets 2
> 3 - I start app B - gets remaining 10
> 4 - If I stop app A, app B *does not* get the now available remaining 2
> cores.
>
> Should I expect Mesos to have this scenario working?
>
> Also, the same question applies to when we add more cores to a cluster.
> Let's say ideally I want 12 cores for my app, although there are only 10.
> As
> I add more workers, they should get assigned to my app dynamically. I
> haven't tested this in a while but I think the app will not even start and
> complain about not enough resources...
>
> Would very much appreciate any knowledge share on this!
>
> tnks,
> Rod
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Dynamically-switching-Nr-of-allocated-core-tp17955.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark job resource allocation best practices

2014-11-03 Thread Romi Kuntsman
So, as said there, static partitioning is used in "Spark’s standalone and
YARN modes, as well as the coarse-grained Mesos mode".
That leaves us only with Mesos, where there is *dynamic sharing* of CPU
cores.

It says "when the application is not running tasks on a machine, other
applications may run tasks on those cores".
But my applications are short lived (seconds to minutes), and they read a
large dataset, process it, and write the results. They are also IO-bound,
meaning most of the time is spent reading input data (from S3) and writing
the results back.

Is it possible to divide the resources between them, according to how many
are trying to run at the same time?
So for example if I have 12 cores - if one job is scheduled, it will get 12
cores, but if 3 are scheduled, then each one will get 4 cores and then will
all start.

Thanks!

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das 
wrote:

> Have a look at scheduling pools
> <https://spark.apache.org/docs/latest/job-scheduling.html>. If you want
> more sophisticated resource allocation, then you are better of to use
> cluster managers like mesos or yarn
>
> Thanks
> Best Regards
>
> On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman  wrote:
>
>> Hello,
>>
>> I have a Spark 1.1.0 standalone cluster, with several nodes, and several
>> jobs (applications) being scheduled at the same time.
>> By default, each Spark job takes up all available CPUs.
>> This way, when more than one job is scheduled, all but the first are
>> stuck in "WAITING".
>> On the other hand, if I tell each job to initially limit itself to a
>> fixed number of CPUs, and that job runs by itself, the cluster is
>> under-utilized and the job runs longer than it could have if it took all
>> the available resources.
>>
>> - How to give the tasks a more fair resource division, which lets many
>> jobs run together, and together lets them use all the available resources?
>> - How do you divide resources between applications on your usecase?
>>
>> P.S. I started reading about Mesos but couldn't figure out if/how it
>> could solve the described issue.
>>
>> Thanks!
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>>  http://www.totango.com
>>
>
>


Spark job resource allocation best practices

2014-11-03 Thread Romi Kuntsman
Hello,

I have a Spark 1.1.0 standalone cluster, with several nodes, and several
jobs (applications) being scheduled at the same time.
By default, each Spark job takes up all available CPUs.
This way, when more than one job is scheduled, all but the first are stuck
in "WAITING".
On the other hand, if I tell each job to initially limit itself to a fixed
number of CPUs, and that job runs by itself, the cluster is under-utilized
and the job runs longer than it could have if it took all the available
resources.

- How to give the tasks a more fair resource division, which lets many jobs
run together, and together lets them use all the available resources?
- How do you divide resources between applications on your usecase?

P.S. I started reading about Mesos but couldn't figure out if/how it could
solve the described issue.

Thanks!

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com


Re: Workers disconnected from master sometimes and never reconnect back

2014-09-29 Thread Romi Kuntsman
Hi all,

Regarding a post here a few months ago
http://apache-spark-user-list.1001560.n3.nabble.com/Workers-disconnected-from-master-sometimes-and-never-reconnect-back-tp6240.html

Is there an answer to this?
I saw workers being still active and not reconnecting after they lost
connection to the master. Using Spark 1.1.0.

What if a master server is restarted, should worker retry to register on it?

Greetings,

-- 
*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com
​Join the Customer Success Manifesto  <http://youtu.be/XvFi2Wh6wgU>