Israel Spark Meetup
Hello, Please add a link in Spark Community page ( https://spark.apache.org/community.html) To Israel Spark Meetup (https://www.meetup.com/israel-spark-users/) We're an active meetup group, unifying the local Spark user community, and having regular meetups. Thanks! Romi K.
diff between apps and waitingApps?
Hello, I'm collecting metrics of master.apps and master.waitingApps, and I see the values always match. ref: https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala What is the meaning of "waitingApps"? And if the only place it's used is in "startExecutorsOnWorkers" where they are filtered as "app.coresLeft > 0", shouldn't that also be filtered in the reported metric? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Shuffle FileNotFound Exception
take executor memory times spark.shuffle.memoryFraction and divide the data so that each partition is less than the above *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Nov 18, 2015 at 2:09 PM, Tom Arnfeld wrote: > Hi Romi, > > Thanks! Could you give me an indication of how much increase the > partitions by? We’ll take a stab in the dark, the input data is around 5M > records (though each record is fairly small). We’ve had trouble both with > DataFrames and RDDs. > > Tom. > > On 18 Nov 2015, at 12:04, Romi Kuntsman wrote: > > I had many issues with shuffles (but not this one exactly), and what > eventually solved it was to repartition to input into more parts. Have you > tried that? > > P.S. not sure if related, but there's a memory leak in the shuffle > mechanism > https://issues.apache.org/jira/browse/SPARK-11293 > > *Romi Kuntsman*, *Big Data Engineer* > http://www.totango.com > > On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld wrote: > >> Hey, >> >> I’m wondering if anyone has run into issues with Spark 1.5 and a >> FileNotFound exception with shuffle.index files? It’s been cropping up with >> very large joins and aggregations, and causing all of our jobs to fail >> towards the end. The memory limit for the executors (we’re running on >> mesos) is touching 60GB+ with ~10 cores per executor, which is way >> oversubscribed. >> >> We’re running spark inside containers, and have configured >> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the >> container for performance/disk reasons, and since then the issue started to >> arise. I’m wondering if there’s a bug with the way spark looks for shuffle >> files, and one of the implementations isn’t obeying the path properly? >> >> I don’t want to set "spark.local.dir” because that requires the driver >> also have this directory set up, which is not the case. >> >> Has anyone seen this issue before? >> >> >> >> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to >> get block(s) from XXX:50777 >> java.lang.RuntimeException: java.io.FileNotFoundException: >> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index >> (No such file or directory) >>at java.io.FileInputStream.open(Native Method) >>at java.io.FileInputStream.(FileInputStream.java:146) >>at >> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98) >>at >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300) >>at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>at >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>at >> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) >>at >> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) >>at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) >>at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) >>at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >>at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >>at >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) >>at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >>at >> io.netty.channel.AbstractChannelHandlerContext.fire
Re: Shuffle FileNotFound Exception
I had many issues with shuffles (but not this one exactly), and what eventually solved it was to repartition to input into more parts. Have you tried that? P.S. not sure if related, but there's a memory leak in the shuffle mechanism https://issues.apache.org/jira/browse/SPARK-11293 *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld wrote: > Hey, > > I’m wondering if anyone has run into issues with Spark 1.5 and a > FileNotFound exception with shuffle.index files? It’s been cropping up with > very large joins and aggregations, and causing all of our jobs to fail > towards the end. The memory limit for the executors (we’re running on > mesos) is touching 60GB+ with ~10 cores per executor, which is way > oversubscribed. > > We’re running spark inside containers, and have configured > “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the > container for performance/disk reasons, and since then the issue started to > arise. I’m wondering if there’s a bug with the way spark looks for shuffle > files, and one of the implementations isn’t obeying the path properly? > > I don’t want to set "spark.local.dir” because that requires the driver > also have this directory set up, which is not the case. > > Has anyone seen this issue before? > > > > 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get > block(s) from XXX:50777 > java.lang.RuntimeException: java.io.FileNotFoundException: > /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index > (No such file or directory) >at java.io.FileInputStream.open(Native Method) >at java.io.FileInputStream.(FileInputStream.java:146) >at > org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98) >at > org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300) >at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >at > org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >at > org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) >at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) >at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) >at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) >at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) >at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) >at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) >at > io.netty.channel.nio.Ab
Re: Some spark apps fail with "All masters are unresponsive", while others pass normally
I didn't see anything about a OOM. This happens sometimes before anything in the application happened, and happens to a few applications at the same time - so I guess it's a communication failure, but the problem is that the error shown doesn't represent the actual problem (which may be a network timeout etc) *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 9, 2015 at 6:00 PM, Akhil Das wrote: > Did you find anything regarding the OOM in the executor logs? > > Thanks > Best Regards > > On Mon, Nov 9, 2015 at 8:44 PM, Romi Kuntsman wrote: > >> If they have a problem managing memory, wouldn't there should be a OOM? >> Why does AppClient throw a NPE? >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Mon, Nov 9, 2015 at 4:59 PM, Akhil Das >> wrote: >> >>> Is that all you have in the executor logs? I suspect some of those jobs >>> are having a hard time managing the memory. >>> >>> Thanks >>> Best Regards >>> >>> On Sun, Nov 1, 2015 at 9:38 PM, Romi Kuntsman wrote: >>> >>>> [adding dev list since it's probably a bug, but i'm not sure how to >>>> reproduce so I can open a bug about it] >>>> >>>> Hi, >>>> >>>> I have a standalone Spark 1.4.0 cluster with 100s of applications >>>> running every day. >>>> >>>> From time to time, the applications crash with the following error (see >>>> below) >>>> But at the same time (and also after that), other applications are >>>> running, so I can safely assume the master and workers are working. >>>> >>>> 1. why is there a NullPointerException? (i can't track the scala stack >>>> trace to the code, but anyway NPE is usually a obvious bug even if there's >>>> actually a network error...) >>>> 2. why can't it connect to the master? (if it's a network timeout, how >>>> to increase it? i see the values are hardcoded inside AppClient) >>>> 3. how to recover from this error? >>>> >>>> >>>> ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application >>>> has been killed. Reason: All masters are unresponsive! Giving up. ERROR >>>> ERROR 01-11 15:32:55,087 OneForOneStrategy - ERROR >>>> logs/error.log >>>> java.lang.NullPointerException NullPointerException >>>> at >>>> org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>> at >>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) >>>> at >>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) >>>> at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>> at >>>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>> at >>>> org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> ERROR 01-11 15:32:55,603 SparkContext - Error >>>> initializing SparkContext. ERROR >>>> java.lang.IllegalStateException: Cannot call methods on a stopped >>>> SparkContext >>>> at org.apache.spark.SparkContext.org >>>> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) >>>> at >>>> org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) >>>> at >>>> org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) >>>> at org.apache.spark.SparkContext.(SparkContext.scala:543) >>>> at >>>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) >>>> >>>> >>>> Thanks! >>>> >>>> *Romi Kuntsman*, *Big Data Engineer* >>>> http://www.totango.com >>>> >>> >>> >> >
Re: Some spark apps fail with "All masters are unresponsive", while others pass normally
If they have a problem managing memory, wouldn't there should be a OOM? Why does AppClient throw a NPE? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 9, 2015 at 4:59 PM, Akhil Das wrote: > Is that all you have in the executor logs? I suspect some of those jobs > are having a hard time managing the memory. > > Thanks > Best Regards > > On Sun, Nov 1, 2015 at 9:38 PM, Romi Kuntsman wrote: > >> [adding dev list since it's probably a bug, but i'm not sure how to >> reproduce so I can open a bug about it] >> >> Hi, >> >> I have a standalone Spark 1.4.0 cluster with 100s of applications running >> every day. >> >> From time to time, the applications crash with the following error (see >> below) >> But at the same time (and also after that), other applications are >> running, so I can safely assume the master and workers are working. >> >> 1. why is there a NullPointerException? (i can't track the scala stack >> trace to the code, but anyway NPE is usually a obvious bug even if there's >> actually a network error...) >> 2. why can't it connect to the master? (if it's a network timeout, how to >> increase it? i see the values are hardcoded inside AppClient) >> 3. how to recover from this error? >> >> >> ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application >> has been killed. Reason: All masters are unresponsive! Giving up. ERROR >> ERROR 01-11 15:32:55,087 OneForOneStrategy - ERROR >> logs/error.log >> java.lang.NullPointerException NullPointerException >> at >> org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) >> at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> ERROR 01-11 15:32:55,603 SparkContext - Error >> initializing SparkContext. ERROR >> java.lang.IllegalStateException: Cannot call methods on a stopped >> SparkContext >> at org.apache.spark.SparkContext.org >> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) >> at >> org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) >> at >> org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) >> at org.apache.spark.SparkContext.(SparkContext.scala:543) >> at >> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) >> >> >> Thanks! >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> > >
Re: JMX with Spark
Have you read this? https://spark.apache.org/docs/latest/monitoring.html *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Thu, Nov 5, 2015 at 2:08 PM, Yogesh Vyas wrote: > Hi, > How we can use JMX and JConsole to monitor our Spark applications? > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?
In my program I move between RDD and DataFrame several times. I know that the entire data of the DF doesn't go into the driver because it wouldn't fit there. But calling toJavaRDD does cause computation. Check the number of partitions you have on the DF and RDD... On Nov 4, 2015 7:54 PM, "Aliaksei Tsyvunchyk" wrote: > Hello Romi, > > Do you mean that in my particular case I’m causing computation on > dataFrame or it is regular behavior of DataFrame.toJavaRDD ? > If it’s regular behavior, do you know which approach could be used to > perform make/reduce on dataFrame without causing it to load all data to > driver program ? > > On Nov 4, 2015, at 12:34 PM, Romi Kuntsman wrote: > > I noticed that toJavaRDD causes a computation on the DataFrame, so is it > considered an action, even though logically it's a transformation? > On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" > wrote: > >> Hello folks, >> >> Recently I have noticed unexpectedly big network traffic between Driver >> Program and Worker node. >> During debugging I have figured out that it is caused by following block >> of code >> >> —— Java ——— — >> DataFrame etpvRecords = context.sql(" SOME SQL query here"); >> Mapper m = new Mapper(localValue, ProgramId::toProgId); >> return etpvRecords >> .toJavaRDD() >> .map(m::mapHutPutViewingRow) >> .reduce(Reducer::reduce); >> —— Java >> >> I’m using debug breakpoint and OS X nettop to monitor traffic between >> processes. So before approaching line toJavaRDD() I have 500Kb of traffic >> and after executing this line I have 2.2 Mb of traffic. But when I check >> size of result of reduce function it is 10 Kb. >> So .toJavaRDD() seems causing worker process return dataset to driver >> process and seems further map/reduce occurs on Driver. >> >> This is definitely not expected by me, so I have 2 questions. >> 1. Is it really expected behavior that DataFrame.toJavaRDD cause whole >> dataset return to driver or I’m doing something wrong? >> 2. What is expected way to perform transformation with DataFrame using >> custom Java map\reduce functions in case if standard SQL features are not >> fit all my needs? >> >> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same >> machine). Java 1.8.0_60. >> >> CONFIDENTIALITY NOTICE: This email and files attached to it are >> confidential. If you are not the intended recipient you are hereby notified >> that using, copying, distributing or taking any action in reliance on the >> contents of this information is strictly prohibited. If you have received >> this email in error please notify the sender and delete this email. >> > > > CONFIDENTIALITY NOTICE: This email and files attached to it are > confidential. If you are not the intended recipient you are hereby notified > that using, copying, distributing or taking any action in reliance on the > contents of this information is strictly prohibited. If you have received > this email in error please notify the sender and delete this email. >
Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?
I noticed that toJavaRDD causes a computation on the DataFrame, so is it considered an action, even though logically it's a transformation? On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" wrote: > Hello folks, > > Recently I have noticed unexpectedly big network traffic between Driver > Program and Worker node. > During debugging I have figured out that it is caused by following block > of code > > —— Java ——— — > DataFrame etpvRecords = context.sql(" SOME SQL query here"); > Mapper m = new Mapper(localValue, ProgramId::toProgId); > return etpvRecords > .toJavaRDD() > .map(m::mapHutPutViewingRow) > .reduce(Reducer::reduce); > —— Java > > I’m using debug breakpoint and OS X nettop to monitor traffic between > processes. So before approaching line toJavaRDD() I have 500Kb of traffic > and after executing this line I have 2.2 Mb of traffic. But when I check > size of result of reduce function it is 10 Kb. > So .toJavaRDD() seems causing worker process return dataset to driver > process and seems further map/reduce occurs on Driver. > > This is definitely not expected by me, so I have 2 questions. > 1. Is it really expected behavior that DataFrame.toJavaRDD cause whole > dataset return to driver or I’m doing something wrong? > 2. What is expected way to perform transformation with DataFrame using > custom Java map\reduce functions in case if standard SQL features are not > fit all my needs? > > Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same > machine). Java 1.8.0_60. > > CONFIDENTIALITY NOTICE: This email and files attached to it are > confidential. If you are not the intended recipient you are hereby notified > that using, copying, distributing or taking any action in reliance on the > contents of this information is strictly prohibited. If you have received > this email in error please notify the sender and delete this email. >
Re: Error : - No filesystem for scheme: spark
except "spark.master", do you have "spark://" anywhere in your code or config files? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. wrote: > > -- Forwarded message -- > From: "Balachandar R.A." > Date: 02-Nov-2015 12:53 pm > Subject: Re: Error : - No filesystem for scheme: spark > To: "Jean-Baptiste Onofré" > Cc: > > > HI JB, > > Thanks for the response, > > Here is the content of my spark-defaults.conf > > > > > > # Default system properties included when running spark-submit. > > # This is useful for setting default environmental settings. > > > > # Example: > > spark.master spark://fdoat:7077 > > # spark.eventLog.enabled true > > spark.eventLog.dir/home/bala/spark-logs > > # spark.eventLog.dir hdfs://namenode:8021/directory > > # spark.serializer > org.apache.spark.serializer.KryoSerializer > > # spark.driver.memory 5g > > # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value > -Dnumbers="one two three" > > > > > > regards > > Bala > > > > > On 2 November 2015 at 12:21, Jean-Baptiste Onofré > wrote: > >> > >> Hi, > >> > >> do you have something special in conf/spark-defaults.conf (especially > on the eventLog directory) ? > >> > >> Regards > >> JB > >> > >> > >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote: > >>> > >>> Can someone tell me at what point this error could come? > >>> > >>> In one of my use cases, I am trying to use hadoop custom input format. > >>> Here is my code. > >>> > >>> |valhConf:Configuration=sc.hadoopConfiguration > >>> > hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob > >>> > =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD > >>> > =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount > >>> > =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}| > >>> > >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the > >>> below error in my spark-submit launch| > >>> > >>> | > >>> | > >>> > >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage > >>> 0.0(TID > 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark > >>> at > >>> > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at > >>> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at > >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)| > >>> > >>> Any help here to move towards fixing this will be of great help > >>> > >>> > >>> > >>> Thanks > >>> > >>> Bala > >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> jbono...@apache.org > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > >> > >> - > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > > > >
Some spark apps fail with "All masters are unresponsive", while others pass normally
[adding dev list since it's probably a bug, but i'm not sure how to reproduce so I can open a bug about it] Hi, I have a standalone Spark 1.4.0 cluster with 100s of applications running every day. >From time to time, the applications crash with the following error (see below) But at the same time (and also after that), other applications are running, so I can safely assume the master and workers are working. 1. why is there a NullPointerException? (i can't track the scala stack trace to the code, but anyway NPE is usually a obvious bug even if there's actually a network error...) 2. why can't it connect to the master? (if it's a network timeout, how to increase it? i see the values are hardcoded inside AppClient) 3. how to recover from this error? ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application has been killed. Reason: All masters are unresponsive! Giving up. ERROR ERROR 01-11 15:32:55,087 OneForOneStrategy - ERROR logs/error.log java.lang.NullPointerException NullPointerException at org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ERROR 01-11 15:32:55,603 SparkContext - Error initializing SparkContext. ERROR java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext at org.apache.spark.SparkContext.org $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) at org.apache.spark.SparkContext.(SparkContext.scala:543) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)
> > BUT, after change limit(500) to limit(1000). The code report > NullPointerException. > I had a similar situation, and the problem was with a certain record. Try to find which records are returned when you limit to 1000 but not returned when you limit to 500. Could it be a NPE thrown from PixelObject? Are you running spark with master=local, so it's running inside your IDE and you can see the errors from the driver and worker? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Thu, Oct 29, 2015 at 10:04 AM, Zhang, Jingyu wrote: > Thanks Romi, > > I resize the dataset to 7MB, however, the code show NullPointerException > exception as well. > > Did you try to cache a DataFrame with just a single row? > > Yes, I tried. But, Same problem. > . > Do you rows have any columns with null values? > > No, I had filter out null values before cache the dataframe. > > Can you post a code snippet here on how you load/generate the dataframe? > > Sure, Here is the working code 1: > > JavaRDD pixels = pixelsStr.map(new PixelGenerator()).cache(); > > System.out.println(pixels.count()); // 3000-4000 rows > > Working code 2: > > JavaRDD pixels = pixelsStr.map(new PixelGenerator()); > > DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject. > class); > > DataFrame totalDF1 = > schemaPixel.select(schemaPixel.col("domain")).filter("'domain' > is not null").limit(500); > > System.out.println(totalDF1.count()); > > > BUT, after change limit(500) to limit(1000). The code report > NullPointerException. > > > JavaRDD pixels = pixelsStr.map(new PixelGenerator()); > > DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject. > class); > > DataFrame totalDF = > schemaPixel.select(schemaPixel.col("domain")).filter("'domain' > is not null").limit(*1000*); > > System.out.println(totalDF.count()); // problem at this line > > 15/10/29 18:56:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > > 15/10/29 18:56:28 INFO TaskSchedulerImpl: Cancelling stage 0 > > 15/10/29 18:56:28 INFO DAGScheduler: ShuffleMapStage 0 (count at > X.java:113) failed in 3.764 s > > 15/10/29 18:56:28 INFO DAGScheduler: Job 0 failed: count at XXX.java:113, > took 3.862207 s > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 0.0 (TID 0, localhost): java.lang.NullPointerException > > at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > Does dataframe.rdd.cache work? > > No, I tried but same exception. > > Thanks, > > Jingyu > > On 29 October 2015 at 17:38, Romi Kuntsman wrote: > >> Did you try to cache a DataFrame with just a single row? >> Do you rows have any columns with null values? >> Can you post a code snippet here on how you load/generate the dataframe? >> Does dataframe.rdd.cache work? >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu >> wrote: >> >>> It is not a problem to use JavaRDD.cache() for 200M data (all Objects >>> read form Json Format). But when I try to use DataFrame.cache(), It shown >>> exception in below. >>> >>> My machine can cache 1 G data in Avro format without any problem. >>> >>> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms >>> >>> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in >>> 27.832369 ms >>> >>> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 >>> (TID 1) >>> >>> java.lang.NullPointerException >>> >>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) >>> >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>> DelegatingMethodAccessorImpl.java:43) >>> >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> >>> at >>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( >>> SQLContext.scala:500) >>> >>> at >>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( >>> SQLContext.scala:500) >>> >>> at scala.collection.TraversableLike$$anonfun$map$1.apply( >>> TraversableLike.scala:244) >>> >>> at scala.collection.TraversableLike$$anonfun$map$1.apply( >>> TraversableLike.scala:244) >>> >>> at scala.
Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)
Did you try to cache a DataFrame with just a single row? Do you rows have any columns with null values? Can you post a code snippet here on how you load/generate the dataframe? Does dataframe.rdd.cache work? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu wrote: > It is not a problem to use JavaRDD.cache() for 200M data (all Objects read > form Json Format). But when I try to use DataFrame.cache(), It shown > exception in below. > > My machine can cache 1 G data in Avro format without any problem. > > 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms > > 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in > 27.832369 ms > > 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID > 1) > > java.lang.NullPointerException > > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:497) > > at > org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( > SQLContext.scala:500) > > at > org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( > SQLContext.scala:500) > > at scala.collection.TraversableLike$$anonfun$map$1.apply( > TraversableLike.scala:244) > > at scala.collection.TraversableLike$$anonfun$map$1.apply( > TraversableLike.scala:244) > > at scala.collection.IndexedSeqOptimized$class.foreach( > IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > > at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply( > SQLContext.scala:500) > > at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply( > SQLContext.scala:498) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > > at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( > InMemoryColumnarTableScan.scala:127) > > at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( > InMemoryColumnarTableScan.scala:120) > > at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278 > ) > > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) > > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38 > ) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38 > ) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > > at org.apache.spark.scheduler.Task.run(Task.scala:88) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > 15/10/29 13:26:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, > localhost): java.lang.NullPointerException > > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > > > Thanks, > > > Jingyu > > This message and its attachments may contain legally privileged or > confidential information. It is intended solely for the named addressee. If > you are not the addressee indicated in this message or responsible for > delivery of the message to the addressee, you may not copy or deliver this > message or its attachments to anyone. Rather, you should permanently delete > this message and its attachments and kindly notify the sender by reply > e-mail. Any content of this message and its attachments which does not > relate to the official business of the sending company must be taken not to > have been sent or endorsed by that company or any of its related entities. > No warranty is made that the e-mail or attachments are free from computer > virus or other defect.
Re: how to get RDD from two different RDDs with cross column
Hi, If I understand correctly: rdd1 contains keys (of type StringDate) rdd2 contains keys and values and rdd3 contains all the keys, and the values from rdd2? I think you should make rdd1 and rdd2 PairRDD, and then use outer join. Does that make sense? On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhu wrote: > Dear Romi, Priya, Sujt and Shivaram and all, > > I have took lots of days to think into this issue, however, without any > enough good solution... > I shall appreciate your all kind help. > > There is an RDD rdd1, and another RDD rdd2, > (rdd2 can be PairRDD, or DataFrame with two columns as ). > StringDate column values from rdd1 and rdd2 are cross but not the same. > > I would like to get a new RDD rdd3, StringDate in rdd3 > would be all from (same) as rdd1, and float in rdd3 would be from rdd2 if > its > StringDate is in rdd2, or else NULL would be assigned. > each row in rdd3[ i ] = , > rdd2[i].StringDate would be same as rdd1[ i ].StringDate, > then rdd2[ i ].float is assigned rdd3[ i ] StringDate part. > What kinds of API or function would I use... > > Thanks very much! > Zhiliang > > >
Re: passing SparkContext as parameter
Cody, that's a great reference! As shown there - the best way to connect to an external database from the workers is to create a connection pool on (each) worker. The driver mass pass, via broadcast, the connection string, but not the connect object itself and not the spark context. On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger wrote: > That isn't accurate, I think you're confused about foreach. > > Look at > > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd > > > On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman wrote: > >> foreach is something that runs on the driver, not the workers. >> >> if you want to perform some function on each record from cassandra, you >> need to do cassandraRdd.map(func), which will run distributed on the spark >> workers >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch >> wrote: >> >>> Yes, but i need to read from cassandra db within a spark >>> transformation..something like.. >>> >>> dstream.forachRDD{ >>> >>> rdd=> rdd.foreach { >>> message => >>> sc.cassandraTable() >>> . >>> . >>> . >>> } >>> } >>> >>> Since rdd.foreach gets executed on workers, how can i make sparkContext >>> available on workers ??? >>> >>> Regards, >>> Padma Ch >>> >>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu wrote: >>> >>>> You can use broadcast variable for passing connection information. >>>> >>>> Cheers >>>> >>>> On Sep 21, 2015, at 4:27 AM, Priya Ch >>>> wrote: >>>> >>>> can i use this sparkContext on executors ?? >>>> In my application, i have scenario of reading from db for certain >>>> records in rdd. Hence I need sparkContext to read from DB (cassandra in our >>>> case), >>>> >>>> If sparkContext couldn't be sent to executors , what is the workaround >>>> for this ?? >>>> >>>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak >>>> wrote: >>>> >>>>> add @transient? >>>>> >>>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch < >>>>> learnings.chitt...@gmail.com> wrote: >>>>> >>>>>> Hello All, >>>>>> >>>>>> How can i pass sparkContext as a parameter to a method in an >>>>>> object. Because passing sparkContext is giving me TaskNotSerializable >>>>>> Exception. >>>>>> >>>>>> How can i achieve this ? >>>>>> >>>>>> Thanks, >>>>>> Padma Ch >>>>>> >>>>> >>>>> >>>> >>> >> >
Re: passing SparkContext as parameter
foreach is something that runs on the driver, not the workers. if you want to perform some function on each record from cassandra, you need to do cassandraRdd.map(func), which will run distributed on the spark workers *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch wrote: > Yes, but i need to read from cassandra db within a spark > transformation..something like.. > > dstream.forachRDD{ > > rdd=> rdd.foreach { > message => > sc.cassandraTable() > . > . > . > } > } > > Since rdd.foreach gets executed on workers, how can i make sparkContext > available on workers ??? > > Regards, > Padma Ch > > On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu wrote: > >> You can use broadcast variable for passing connection information. >> >> Cheers >> >> On Sep 21, 2015, at 4:27 AM, Priya Ch >> wrote: >> >> can i use this sparkContext on executors ?? >> In my application, i have scenario of reading from db for certain records >> in rdd. Hence I need sparkContext to read from DB (cassandra in our case), >> >> If sparkContext couldn't be sent to executors , what is the workaround >> for this ?? >> >> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak wrote: >> >>> add @transient? >>> >>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch >> > wrote: >>> >>>> Hello All, >>>> >>>> How can i pass sparkContext as a parameter to a method in an >>>> object. Because passing sparkContext is giving me TaskNotSerializable >>>> Exception. >>>> >>>> How can i achieve this ? >>>> >>>> Thanks, >>>> Padma Ch >>>> >>> >>> >> >
Re: passing SparkContext as parameter
sparkConext is available on the driver, not on executors. To read from Cassandra, you can use something like this: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Sep 21, 2015 at 2:27 PM, Priya Ch wrote: > can i use this sparkContext on executors ?? > In my application, i have scenario of reading from db for certain records > in rdd. Hence I need sparkContext to read from DB (cassandra in our case), > > If sparkContext couldn't be sent to executors , what is the workaround for > this ?? > > On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak wrote: > >> add @transient? >> >> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch >> wrote: >> >>> Hello All, >>> >>> How can i pass sparkContext as a parameter to a method in an object. >>> Because passing sparkContext is giving me TaskNotSerializable Exception. >>> >>> How can i achieve this ? >>> >>> Thanks, >>> Padma Ch >>> >> >> >
Re: How to get a new RDD by ordinarily subtract its adjacent rows
RDD is a set of data rows (in your case numbers), there is no meaning for the order of the items. What exactly are you trying to accomplish? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu wrote: > Dear , > > I have took lots of days to think into this issue, however, without any > success... > I shall appreciate your all kind help. > > There is an RDD rdd1, I would like get a new RDD rdd2, each row > in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] . > What kinds of API or function would I use... > > > Thanks very much! > John > >
How to determine the value for spark.sql.shuffle.partitions?
Hi all, The number of partition greatly affect the speed and efficiency of calculation, in my case in DataFrames/SparkSQL on Spark 1.4.0. Too few partitions with large data cause OOM exceptions. Too many partitions on small data cause a delay due to overhead. How do you programmatically determine the optimal number of partitions and cores in Spark, as a function of: 1. available memory per core 2. number of records in input data 3. average/maximum record size 4. cache configuration 5. shuffle configuration 6. serialization 7. etc? Any general best practices? Thanks! Romi K.
Re: How to remove worker node but let it finish first?
It's only available in Mesos? I'm using spark standalone cluster, is there anything about it there? On Fri, Aug 28, 2015 at 8:51 AM Akhil Das wrote: > You can create a custom mesos framework for your requirement, to get you > started you can check this out > http://mesos.apache.org/documentation/latest/app-framework-development-guide/ > > Thanks > Best Regards > > On Mon, Aug 24, 2015 at 12:11 PM, Romi Kuntsman wrote: > >> Hi, >> I have a spark standalone cluster with 100s of applications per day, and >> it changes size (more or less workers) at various hours. The driver runs on >> a separate machine outside the spark cluster. >> >> When a job is running and it's worker is killed (because at that hour the >> number of workers is reduced), it sometimes fails, instead of >> redistributing the work to other workers. >> >> How is it possible to decomission a worker, so that it doesn't receive >> any new work, but does finish all existing work before shutting down? >> >> Thanks! >> > >
Re: Exception when S3 path contains colons
Hello, We had the same problem. I've written a blog post with the detailed explanation and workaround: http://labs.totango.com/spark-read-file-with-colon/ Greetings, Romi K. On Tue, Aug 25, 2015 at 2:47 PM Gourav Sengupta wrote: > I am not quite sure about this but should the notation not be > s3n://redactedbucketname/* > instead of > s3a://redactedbucketname/* > > The best way is to use s3://<>/<>/* > > > Regards, > Gourav > > On Tue, Aug 25, 2015 at 10:35 AM, Akhil Das > wrote: > >> You can change the names, whatever program that is pushing the record >> must follow the naming conventions. Try to replace : with _ or something. >> >> Thanks >> Best Regards >> >> On Tue, Aug 18, 2015 at 10:20 AM, Brian Stempin >> wrote: >> >>> Hi, >>> I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing >>> the exception below when encountering file names that contain colons. Any >>> idea on how to get around this? >>> >>> scala> val files = sc.textFile("s3a://redactedbucketname/*") >>> >>> 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore >>> (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with >>> curMem=669367, maxMem=285203496 >>> >>> 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore >>> (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory >>> (estimated size 236.5 KB, free 271.1 MB) >>> >>> 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore >>> (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with >>> curMem=911591, maxMem=285203496 >>> >>> 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore >>> (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in >>> memory (estimated size 21.0 KB, free 271.1 MB) >>> >>> 2015-08-18 04:38:34,665 INFO >>> [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo >>> (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on >>> 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB) >>> >>> 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext >>> (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at >>> :21 >>> >>> files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at >>> textFile at :21 >>> >>> >>> scala> files.count >>> >>> 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem >>> (S3AFileSystem.java:listStatus(533)) - List status for path: >>> s3a://redactedbucketname/ >>> >>> 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem >>> (S3AFileSystem.java:getFileStatus(684)) - Getting path status for >>> s3a://redactedbucketname/ () >>> >>> java.lang.IllegalArgumentException: java.net.URISyntaxException: >>> Relative path in absolute URI: >>> [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv >>> >>> at org.apache.hadoop.fs.Path.initialize(Path.java:206) >>> >>> at org.apache.hadoop.fs.Path.(Path.java:172) >>> >>> at org.apache.hadoop.fs.Path.(Path.java:94) >>> >>> at org.apache.hadoop.fs.Globber.glob(Globber.java:240) >>> >>> at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) >>> >>> at >>> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) >>> >>> at >>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) >>> >>> at >>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) >>> >>> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) >>> >>> at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) >>> >>> at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) >>> >>> at scala.Option.getOrElse(Option.scala:120) >>> >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) >>> >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) >>> >>> at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) >>> >>> at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) >>> >>> at scala.Option.getOrElse(Option.scala:120) >>> >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) >>> >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) >>> >>> at org.apache.spark.rdd.RDD.count(RDD.scala:1099) >>> >>> at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.(:24) >>> >>> at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.(:29) >>> >>> at $iwC$iwC$iwC$iwC$iwC$iwC.(:31) >>> >>> at $iwC$iwC$iwC$iwC$iwC.(:33) >>> >>> at $iwC$iwC$iwC$iwC.(:35) >>> >>> at $iwC$iwC$iwC.(:37) >>> >>> at $iwC$iwC.(:39) >>> >>> at $iwC.(:41) >>> >>> at (:43) >>> >>> at .(:47) >>> >>> at .() >>> >>> at .(:7) >>> >>> at .() >>> >>> at $print() >>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> >>> at >>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(S
How to remove worker node but let it finish first?
Hi, I have a spark standalone cluster with 100s of applications per day, and it changes size (more or less workers) at various hours. The driver runs on a separate machine outside the spark cluster. When a job is running and it's worker is killed (because at that hour the number of workers is reduced), it sometimes fails, instead of redistributing the work to other workers. How is it possible to decomission a worker, so that it doesn't receive any new work, but does finish all existing work before shutting down? Thanks!
Re: How to overwrite partition when writing Parquet?
Cheng - what if I want to overwrite a specific partition? I'll to remove the folder, as Hemant suggested... On Thu, Aug 20, 2015 at 1:17 PM Cheng Lian wrote: > You can apply a filter first to filter out data of needed dates and then > append them. > > > Cheng > > > On 8/20/15 4:59 PM, Hemant Bhanawat wrote: > > How can I overwrite only a given partition or manually remove a partition > before writing? > > I don't know if (and I don't think) there is a way to do that using a > mode. But doesn't manually deleting the directory of a particular partition > help? For directory structure, check this out... > > > http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery > > > On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman wrote: > >> Hello, >> >> I have a DataFrame, with a date column which I want to use as a partition. >> Each day I want to write the data for the same date in Parquet, and then >> read a dataframe for a date range. >> >> I'm using: >> >> myDataframe.write().partitionBy("date").mode(SaveMode.Overwrite).parquet(parquetDir); >> >> If I use SaveMode.Append, then writing data for the same partition adds >> the same data there again. >> If I use SaveMode.Overwrite, then writing data for a single partition >> removes all the data for all partitions. >> >> How can I overwrite only a given partition or manually remove a partition >> before writing? >> >> Many thanks! >> Romi K. >> > > >
Re: How to minimize shuffling on Spark dataframe Join?
If you create a PairRDD from the DataFrame, using dataFrame.toRDD().mapToPair(), then you can call partitionBy(someCustomPartitioner) which will partition the RDD by the key (of the pair). Then the operations on it (like joining with another RDD) will consider this partitioning. I'm not sure that DataFrames already support this. On Wed, Aug 12, 2015 at 11:16 AM Abdullah Anwar < abdullah.ibn.an...@gmail.com> wrote: > Hi Hemant, > > Thank you for your replay. > > I think source of my dataframe is not partitioned on key, its an avro > file where 'id' is a field .. but I don't know how to read a file and at > the same time configure partition key. I couldn't find anything on > SQLContext.read.load where you can set partition key. or in dataframe where > you can set partition key. If it could partition the on the specified key > .. will spark put the same partition range on same machine for two > different dataframe?? > >What are the overall tips to join faster? > > Best Regards, > Abdullah > > > > > On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat > wrote: > >> Is the source of your dataframe partitioned on key? As per your mail, it >> looks like it is not. If that is the case, for partitioning the data, you >> will have to shuffle the data anyway. >> >> Another part of your question is - how to co-group data from two >> dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is >> a way. I am not sure if something similar is available for DataFrames. >> >> Hemant >> >> >> >> >> >> On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar < >> abdullah.ibn.an...@gmail.com> wrote: >> >>> >>> >>> I have two dataframes like this >>> >>> student_rdf = (studentid, name, ...) >>> student_result_rdf = (studentid, gpa, ...) >>> >>> we need to join this two dataframes. we are now doing like this, >>> >>> student_rdf.join(student_result_rdf, student_result_rdf["studentid"] == >>> student_rdf["studentid"]) >>> >>> So it is simple. But it creates lots of data shuffling across worker >>> nodes, but as joining key is similar and if the dataframe could (understand >>> the partitionkey) be partitioned using that key (studentid) then there >>> suppose not to be any shuffling at all. As similar data (based on partition >>> key) would reside in similar node. is it possible, to hint spark to do this? >>> >>> So, I am finding the way to partition data based on a column while I >>> read a dataframe from input. And If it is possible that Spark would >>> understand that two partitionkey of two dataframes are similar, then how? >>> >>> >>> >>> >>> -- >>> Abdullah >>> >> >> > > > -- > Abdullah >
Re: Issues with S3 paths that contain colons
I had the exact same issue, and overcame it by overriding NativeS3FileSystem with my own class, where I replaced the implementation of globStatus. It's a hack but it works. Then I set the hadoop config fs.myschema.impl to my class name, and accessed the files through myschema:// instead of s3n:// @Override public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter) throws IOException { final FileStatus[] statusList = super.listStatus(pathPattern); final List result = Lists.newLinkedList(); for (FileStatus fileStatus : statusList) { if (filter.accept(fileStatus.getPath())) { result.add(fileStatus); } } return result.toArray(new FileStatus[] {}); } On Wed, Aug 19, 2015 at 9:14 PM Steve Loughran wrote: > you might want to think about filing a JIRA on issues.apache.org against > HADOOP here, component being fs/s3. That doesn't mean it is fixable, only > known. > > Every FS has its own set of forbidden characters & filenames; unix doesn't > files named "."; windows doesn't allow files called COM1, ..., so hitting > some filesystem rule is sometimes a problem. Here, though, you've got the > file in S3, the listing finds it, but other bits of the codepath are > failing -which implies that it is something in the Hadoop libs. > > > > On 18 Aug 2015, at 08:20, Brian Stempin wrote: > > > > Hi, > > I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing > the > > exception below when encountering file names that contain colons. Any > idea > > on how to get around this? > > > > scala> val files = sc.textFile("s3a://redactedbucketname/*") > > > > 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore > > (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with > > curMem=669367, maxMem=285203496 > > > > 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore > > (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in > memory > > (estimated size 236.5 KB, free 271.1 MB) > > > > 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore > > (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with > > curMem=911591, maxMem=285203496 > > > > 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore > > (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in > > memory (estimated size 21.0 KB, free 271.1 MB) > > > > 2015-08-18 04:38:34,665 INFO > [sparkDriver-akka.actor.default-dispatcher-19] > > storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added > > broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB, > free: > > 271.9 MB) > > > > 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext > > (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at > > :21 > > > > files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at > textFile at > > :21 > > > > > > scala> files.count > > > > 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem > > (S3AFileSystem.java:listStatus(533)) - List status for path: > > s3a://redactedbucketname/ > > > > 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem > > (S3AFileSystem.java:getFileStatus(684)) - Getting path status for > > s3a://redactedbucketname/ () > > > > java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative > > path in absolute URI: > > > [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv > > > > at org.apache.hadoop.fs.Path.initialize(Path.java:206) > > > > at org.apache.hadoop.fs.Path.(Path.java:172) > > > > at org.apache.hadoop.fs.Path.(Path.java:94) > > > > at org.apache.hadoop.fs.Globber.glob(Globber.java:240) > > > > at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) > > > > at > > > org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) > > > > at > > > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) > > > > at > > > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) > > > > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) > > > > at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) > > > > at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) > > > > at scala.Option.getOrElse(Option.scala:120) > > > > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > > > > at > > > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > > > > at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) > > > > at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) > > > > at scala.Option.getOrElse(Option.scala:120) > > > > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > > > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) > > > > at org.apache.spark.rdd.RDD.count(RDD.scala:1099) > > > > at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.(:24) > > > > at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.(:29) > > > > at $iwC$iwC$iwC$iwC$iwC$iwC.(:31) > > > > at $iwC
How to overwrite partition when writing Parquet?
Hello, I have a DataFrame, with a date column which I want to use as a partition. Each day I want to write the data for the same date in Parquet, and then read a dataframe for a date range. I'm using: myDataframe.write().partitionBy("date").mode(SaveMode.Overwrite).parquet(parquetDir); If I use SaveMode.Append, then writing data for the same partition adds the same data there again. If I use SaveMode.Overwrite, then writing data for a single partition removes all the data for all partitions. How can I overwrite only a given partition or manually remove a partition before writing? Many thanks! Romi K.
Re: spark as a lookup engine for dedup
RDD is immutable, it cannot be changed, you can only create a new one from data or from transformation. It sounds inefficient to create one each 15 sec for the last 24 hours. I think a key-value store will be much more fitted for this purpose. On Mon, Jul 27, 2015 at 11:21 AM Shushant Arora wrote: > its for 1 day events in range of 1 billions and processing is in streaming > application of ~10-15 sec interval so lookup should be fast. RDD need to > be updated with new events and old events of current time-24 hours back > should be removed at each processing. > > So is spark RDD not fit for this requirement? > > On Mon, Jul 27, 2015 at 1:08 PM, Romi Kuntsman wrote: > >> What the throughput of processing and for how long do you need to >> remember duplicates? >> >> You can take all the events, put them in an RDD, group by the key, and >> then process each key only once. >> But if you have a long running application where you want to check that >> you didn't see the same value before, and check that for every value, you >> probably need a key-value store, not RDD. >> >> On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora >> wrote: >> >>> Hi >>> >>> I have a requirement for processing large events but ignoring duplicate >>> at the same time. >>> >>> Events are consumed from kafka and each event has a eventid. It may >>> happen that an event is already processed and came again at some other >>> offset. >>> >>> 1.Can I use Spark RDD to persist processed events and then lookup with >>> this rdd (How to do lookup inside a RDD ?I have a >>> JavaPairRDD ) >>> while processing new events and if event is present in persisted rdd >>> ignore it , else process the even. Does rdd.lookup(key) on billion of >>> events will be efficient ? >>> >>> 2. update the rdd (Since RDD is immutable how to update it)? >>> >>> Thanks >>> >>> >
Re: spark as a lookup engine for dedup
What the throughput of processing and for how long do you need to remember duplicates? You can take all the events, put them in an RDD, group by the key, and then process each key only once. But if you have a long running application where you want to check that you didn't see the same value before, and check that for every value, you probably need a key-value store, not RDD. On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora wrote: > Hi > > I have a requirement for processing large events but ignoring duplicate at > the same time. > > Events are consumed from kafka and each event has a eventid. It may happen > that an event is already processed and came again at some other offset. > > 1.Can I use Spark RDD to persist processed events and then lookup with > this rdd (How to do lookup inside a RDD ?I have a > JavaPairRDD ) > while processing new events and if event is present in persisted rdd > ignore it , else process the even. Does rdd.lookup(key) on billion of > events will be efficient ? > > 2. update the rdd (Since RDD is immutable how to update it)? > > Thanks > >
Re: Scaling spark cluster for a running application
Are you running the Spark cluster in standalone or YARN? In standalone, the application gets the available resources when it starts. With YARN, you can try to turn on the setting *spark.dynamicAllocation.enabled* See https://spark.apache.org/docs/latest/configuration.html On Wed, Jul 22, 2015 at 2:20 PM phagunbaya wrote: > I have a spark cluster running in client mode with driver outside the spark > cluster. I want to scale the cluster after an application is submitted. In > order to do this, I'm creating new workers and they are getting registered > with master but issue I'm seeing is; running application does not use the > newly added worker. Hence cannot add more resources to existing running > application. > > Is there any other way or config to deal with this use-case ? How to make > running application to ask for executors from newly issued worker node ? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Applications metrics unseparatable from Master metrics?
Hi, I tried to enable Master metrics source (to get number of running/waiting applications etc), and connected it to Graphite. However, when these are enabled, application metrics are also sent. Is it possible to separate them, and send only master metrics without applications? I see that Master class is registering both: https://github.com/apache/spark/blob/b9a922e260bec1b211437f020be37fab46a85db0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L91 Thanks, RK.
Re: Timestamp functions for sqlContext
Hi Tal, I'm not sure there is currently a built-in function for it, but you can easily define a UDF (user defined function) by extending org.apache.spark.sql.api.java.UDF1, registering it (sparkContext.udf().register(...)), and then use it inside your query. RK. On Tue, Jul 21, 2015 at 7:04 PM Tal Rozen wrote: > Hi, > > I'm running a query with sql context where one of the fields is of type > java.sql.Timestamp. I'd like to set a function similar to DATEDIFF in > mysql, between the date given in each row, and now. So If I was able to use > the same syntax as in mysql it would be: > > val date_diff_df = sqlContext.sql("select DATEDIFF(curdate(), > rowTimestamp) date_diff from tableName") > > What are the relevant key words to replace curdate(), and DATEDIFF? > > Thanks > > > > > >
Spark Application stuck retrying task failed on Java heap space?
Hello, *TL;DR: task crashes with OOM, but application gets stuck in infinite loop retrying the task over and over again instead of failing fast.* Using Spark 1.4.0, standalone, with DataFrames on Java 7. I have an application that does some aggregations. I played around with shuffling settings, which led to the dreaded Java heap space error. See the stack trace at the end of this message. When this happens, I see 10's of executors in "EXITED" state, a couple in "LOADING" and one in "RUNNING". All of them are retrying the same task all over again, and keep failing with the same "Java heap space" error. This goes on for hours! Why doesn't the whole application fail, when the individual executors keep failing with the same error? Thanks, Romi K. --- end of the log in a failed task: 15/07/21 11:13:40 INFO executor.Executor: Finished task 117.0 in stage 218.1 (TID 305). 2000 bytes result sent to driver 15/07/21 11:13:41 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 306 15/07/21 11:13:41 INFO executor.Executor: Running task 0.0 in stage 219.1 (TID 306) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Updating epoch to 420 and clearing cache 15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 8 15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(5463) called with curMem=285917, maxMem=1406164008 15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 5.3 KB, free 1340.7 MB) 15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Reading broadcast variable 8 took 22 ms 15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(10880) called with curMem=291380, maxMem=1406164008 15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 10.6 KB, free 1340.7 MB) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 136, fetching them 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = AkkaRpcEndpointRef(Actor[akka.tcp:// sparkDriver@1.2.3.4:57490/user/MapOutputTracker#-99712578]) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Got the output locations 15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Getting 182 non-empty blocks out of 182 blocks 15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Started 4 remote fetches in 28 ms 15/07/21 11:14:34 ERROR executor.Executor: Exception in task 0.0 in stage 219.1 (TID 306) java.lang.OutOfMemoryError: Java heap space at scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99) at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47) at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83) at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:192) at org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:190) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
Re: Guava 11 dependency issue in Spark 1.2.0
Actually there is already someone on Hadoop-Common-Dev taking care of removing the old Guava dependency http://mail-archives.apache.org/mod_mbox/hadoop-common-dev/201501.mbox/browser https://issues.apache.org/jira/browse/HADOOP-11470 *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Jan 19, 2015 at 4:03 PM, Romi Kuntsman wrote: > I have recently encountered a similar problem with Guava version collision > with Hadoop. > > Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are > they staying in version 11, does anyone know? > > *Romi Kuntsman*, *Big Data Engineer* > http://www.totango.com > > On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera > wrote: > >> Hi Sean, >> >> I removed the hadoop dependencies from the app and ran it on the cluster. >> It gives a java.io.EOFException >> >> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with >> curMem=0, maxMem=2004174766 >> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in >> memory (estimated size 173.0 KB, free 1911.2 MB) >> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with >> curMem=177166, maxMem=2004174766 >> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as >> bytes in memory (estimated size 24.9 KB, free 1911.1 MB) >> 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in >> memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB) >> 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block >> broadcast_0_piece0 >> 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile >> at AvroRelation.scala:45 >> 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1 >> 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at >> SparkPlan.scala:84 >> 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at >> SparkPlan.scala:84) with 2 output partitions (allowLocal=false) >> 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at >> SparkPlan.scala:84) >> 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List() >> 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List() >> 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at >> map at SparkPlan.scala:84), which has no missing parents >> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with >> curMem=202668, maxMem=2004174766 >> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in >> memory (estimated size 4.8 KB, free 1911.1 MB) >> 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with >> curMem=207532, maxMem=2004174766 >> 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as >> bytes in memory (estimated size 3.4 KB, free 1911.1 MB) >> 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in >> memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB) >> 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block >> broadcast_1_piece0 >> 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast >> at DAGScheduler.scala:838 >> 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from >> Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84) >> 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks >> 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 >> (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) >> 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 >> (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) >> 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, >> 10.100.5.109): java.io.EOFException >> at >> java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722) >> at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009) >> at >> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) >> at >> org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) >> at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) >> at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) >> at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) >> at >> org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) >> at >> org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) >> at >> org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) >> at org.apache.s
Re: Guava 11 dependency issue in Spark 1.2.0
I have recently encountered a similar problem with Guava version collision with Hadoop. Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are they staying in version 11, does anyone know? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera wrote: > Hi Sean, > > I removed the hadoop dependencies from the app and ran it on the cluster. > It gives a java.io.EOFException > > 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with > curMem=0, maxMem=2004174766 > 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 173.0 KB, free 1911.2 MB) > 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with > curMem=177166, maxMem=2004174766 > 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as > bytes in memory (estimated size 24.9 KB, free 1911.1 MB) > 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in > memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB) > 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block > broadcast_0_piece0 > 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile > at AvroRelation.scala:45 > 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1 > 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at > SparkPlan.scala:84 > 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at > SparkPlan.scala:84) with 2 output partitions (allowLocal=false) > 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at > SparkPlan.scala:84) > 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List() > 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List() > 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at > map at SparkPlan.scala:84), which has no missing parents > 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with > curMem=202668, maxMem=2004174766 > 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 4.8 KB, free 1911.1 MB) > 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with > curMem=207532, maxMem=2004174766 > 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as > bytes in memory (estimated size 3.4 KB, free 1911.1 MB) > 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in > memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB) > 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block > broadcast_1_piece0 > 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:838 > 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from Stage > 0 (MappedRDD[6] at map at SparkPlan.scala:84) > 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks > 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID > 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) > 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID > 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) > 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, > 10.100.5.109): java.io.EOFException > at > java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722) > at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009) > at > org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) > at > org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) > at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) > at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) > at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) > at > org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) > at > org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) > at > org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) > at > org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871) &g
Re: Announcing Spark 1.1.1!
About version compatibility and upgrade path - can the Java application dependencies and the Spark server be upgraded separately (i.e. will 1.1.0 library work with 1.1.1 server, and vice versa), or do they need to be upgraded together? Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or wrote: > I am happy to announce the availability of Spark 1.1.1! This is a > maintenance release with many bug fixes, most of which are concentrated in > the core. This list includes various fixes to sort-based shuffle, memory > leak, and spilling issues. Contributions from this release came from 55 > developers. > > Visit the release notes [1] to read about the new features, or > download [2] the release today. > > [1] http://spark.apache.org/releases/spark-release-1-1-1.html > [2] http://spark.apache.org/downloads.html > > Please e-mail me directly for any typo's in the release notes or name > listing. > > Thanks for everyone who contributed, and congratulations! > -Andrew >
ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly
Hello, I have a large data calculation in Spark, distributed across serveral nodes. In the end, I want to write to a single output file. For this I do: output.coalesce(1, false).saveAsTextFile(filename). What happens is all the data from the workers flows to a single worker, and that one writes the data. If the data is small enough, it all goes well. However, for a RDD from a certain size, I get a lot of the following messages (see below). >From what I understand, ExternalAppendOnlyMap spills the data to disk when it can't hold it in memory. Is there a way to tell it to stream the data right to disk, instead of spilling each block slowly? 14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 22 ms 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 70 non-empty blocks out of 90 blocks 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (1 time so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (2 times so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 2 ms 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 15 MB to disk (1 time so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 16 MB to disk (2 times so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 14 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (33 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (34 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (35 times so far) 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (1 time so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (2 times so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 9 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (36 times so far) 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 11 MB to disk (37 times so far) 14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task 'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly
Hello, I have a large data calculation in Spark, distributed across serveral nodes. In the end, I want to write to a single output file. For this I do: output.coalesce(1, false).saveAsTextFile(filename). What happens is all the data from the workers flows to a single worker, and that one writes the data. If the data is small enough, it all goes well. However, for a RDD from a certain size, I get a lot of the following messages (see below). >From what I understand, ExternalAppendOnlyMap spills the data to disk when it can't hold it in memory. Is there a way to tell it to stream the data right to disk, instead of spilling each block slowly? 14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 22 ms 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 70 non-empty blocks out of 90 blocks 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (1 time so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (2 times so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 2 ms 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 15 MB to disk (1 time so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 16 MB to disk (2 times so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 14 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (33 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (34 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (35 times so far) 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (1 time so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (2 times so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 9 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (36 times so far) 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 11 MB to disk (37 times so far) 14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task 'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Task time measurement
Hello, Currently in Spark standalone console, I can only see how long the entire job took. How can I know how long it was in WAITING and how long in RUNNING, and also when running, how much each of the jobs inside took? Thanks, *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Snappy temp files not cleaned up
Hello, Spark has an internal cleanup mechanism (defined by spark.cleaner.ttl, see http://spark.apache.org/docs/latest/configuration.html) which cleans up tasks and stages. However, in our installation, we noticed that Snappy temporary files and never cleaned up. Is it a misconfiguration? Missing feature? How do you deal with build-up of temp files? Thanks, *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Spark job resource allocation best practices
Let's say that I run Spark on Mesos in fine-grained mode, and I have 12 cores and 64GB memory. I run application A on Spark, and some time after that (but before A finished) application B. How many CPUs will each of them get? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 11:33 AM, Akhil Das wrote: > You need to install mesos on your cluster. Then you will run your spark > applications by specifying mesos master (mesos://) instead of (spark://). > > Spark can run over Mesos in two modes: “*fine-grained*” (default) and “ > *coarse-grained*”. > > In “*fine-grained*” mode (default), each Spark task runs as a separate > Mesos task. This allows multiple instances of Spark (and other frameworks) > to share machines at a very fine granularity, where each application gets > more or fewer machines as it ramps up and down, but it comes with an > additional overhead in launching each task. This mode may be inappropriate > for low-latency requirements like interactive queries or serving web > requests. > > The “*coarse-grained*” mode will instead launch only one long-running > Spark task on each Mesos machine, and dynamically schedule its own > “mini-tasks” within it. The benefit is much lower startup overhead, but at > the cost of reserving the Mesos resources for the complete duration of the > application. > > To run in coarse-grained mode, set the spark.mesos.coarse property in your > SparkConf: > conf.set("spark.mesos.coarse", "true") > > > In addition, for coarse-grained mode, you can control the maximum number > of resources Spark will acquire. By default, it will acquire all cores in > the cluster (that get offered by Mesos), which only makes sense if you run > just one application at a time. You can cap the maximum number of cores > using conf.set("spark.cores.max", "10") (for example). > > > If you run your application in fine-grained mode, then mesos will take > care of the resource allocation for you. You just tell mesos from your > application that you are running in fine-grained mode, and it is the > default mode. > > Thanks > Best Regards > > On Tue, Nov 4, 2014 at 2:46 PM, Romi Kuntsman wrote: > >> I have a single Spark cluster, not multiple frameworks and not multiple >> versions. Is it relevant for my use-case? >> Where can I find information about exactly how to make Mesos tell Spark >> how many resources of the cluster to use? (instead of the default take-all) >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Tue, Nov 4, 2014 at 11:00 AM, Akhil Das >> wrote: >> >>> You can look at different modes over here >>> http://docs.sigmoidanalytics.com/index.php/Spark_On_Mesos#Mesos_Run_Modes >>> >>> These people has very good tutorial to get you started >>> http://mesosphere.com/docs/tutorials/run-spark-on-mesos/#overview >>> >>> Thanks >>> Best Regards >>> >>> On Tue, Nov 4, 2014 at 1:44 PM, Romi Kuntsman wrote: >>> >>>> How can I configure Mesos allocation policy to share resources between >>>> all current Spark applications? I can't seem to find it in the architecture >>>> docs. >>>> >>>> *Romi Kuntsman*, *Big Data Engineer* >>>> http://www.totango.com >>>> >>>> On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das >>>> wrote: >>>> >>>>> Yes. i believe Mesos is the right choice for you. >>>>> http://mesos.apache.org/documentation/latest/mesos-architecture/ >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman >>>>> wrote: >>>>> >>>>>> So, as said there, static partitioning is used in "Spark’s standalone >>>>>> and YARN modes, as well as the coarse-grained Mesos mode". >>>>>> That leaves us only with Mesos, where there is *dynamic sharing* of >>>>>> CPU cores. >>>>>> >>>>>> It says "when the application is not running tasks on a machine, >>>>>> other applications may run tasks on those cores". >>>>>> But my applications are short lived (seconds to minutes), and they >>>>>> read a large dataset, process it, and write the results. They are also >>>>>> IO-bound, meaning most of the time is spent reading input data (from S3) >>>>>> and writing the results back. >>>>>> >>>>
Re: Spark job resource allocation best practices
I have a single Spark cluster, not multiple frameworks and not multiple versions. Is it relevant for my use-case? Where can I find information about exactly how to make Mesos tell Spark how many resources of the cluster to use? (instead of the default take-all) *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 11:00 AM, Akhil Das wrote: > You can look at different modes over here > http://docs.sigmoidanalytics.com/index.php/Spark_On_Mesos#Mesos_Run_Modes > > These people has very good tutorial to get you started > http://mesosphere.com/docs/tutorials/run-spark-on-mesos/#overview > > Thanks > Best Regards > > On Tue, Nov 4, 2014 at 1:44 PM, Romi Kuntsman wrote: > >> How can I configure Mesos allocation policy to share resources between >> all current Spark applications? I can't seem to find it in the architecture >> docs. >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das >> wrote: >> >>> Yes. i believe Mesos is the right choice for you. >>> http://mesos.apache.org/documentation/latest/mesos-architecture/ >>> >>> Thanks >>> Best Regards >>> >>> On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman wrote: >>> >>>> So, as said there, static partitioning is used in "Spark’s standalone >>>> and YARN modes, as well as the coarse-grained Mesos mode". >>>> That leaves us only with Mesos, where there is *dynamic sharing* of >>>> CPU cores. >>>> >>>> It says "when the application is not running tasks on a machine, other >>>> applications may run tasks on those cores". >>>> But my applications are short lived (seconds to minutes), and they read >>>> a large dataset, process it, and write the results. They are also IO-bound, >>>> meaning most of the time is spent reading input data (from S3) and writing >>>> the results back. >>>> >>>> Is it possible to divide the resources between them, according to how >>>> many are trying to run at the same time? >>>> So for example if I have 12 cores - if one job is scheduled, it will >>>> get 12 cores, but if 3 are scheduled, then each one will get 4 cores and >>>> then will all start. >>>> >>>> Thanks! >>>> >>>> *Romi Kuntsman*, *Big Data Engineer* >>>> http://www.totango.com >>>> >>>> On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das >>>> wrote: >>>> >>>>> Have a look at scheduling pools >>>>> <https://spark.apache.org/docs/latest/job-scheduling.html>. If you >>>>> want more sophisticated resource allocation, then you are better of to use >>>>> cluster managers like mesos or yarn >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I have a Spark 1.1.0 standalone cluster, with several nodes, and >>>>>> several jobs (applications) being scheduled at the same time. >>>>>> By default, each Spark job takes up all available CPUs. >>>>>> This way, when more than one job is scheduled, all but the first are >>>>>> stuck in "WAITING". >>>>>> On the other hand, if I tell each job to initially limit itself to a >>>>>> fixed number of CPUs, and that job runs by itself, the cluster is >>>>>> under-utilized and the job runs longer than it could have if it took all >>>>>> the available resources. >>>>>> >>>>>> - How to give the tasks a more fair resource division, which lets >>>>>> many jobs run together, and together lets them use all the available >>>>>> resources? >>>>>> - How do you divide resources between applications on your usecase? >>>>>> >>>>>> P.S. I started reading about Mesos but couldn't figure out if/how it >>>>>> could solve the described issue. >>>>>> >>>>>> Thanks! >>>>>> >>>>>> *Romi Kuntsman*, *Big Data Engineer* >>>>>> http://www.totango.com >>>>>> >>>>> >>>>> >>>> >>> >> >
Re: Spark job resource allocation best practices
How can I configure Mesos allocation policy to share resources between all current Spark applications? I can't seem to find it in the architecture docs. *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das wrote: > Yes. i believe Mesos is the right choice for you. > http://mesos.apache.org/documentation/latest/mesos-architecture/ > > Thanks > Best Regards > > On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman wrote: > >> So, as said there, static partitioning is used in "Spark’s standalone and >> YARN modes, as well as the coarse-grained Mesos mode". >> That leaves us only with Mesos, where there is *dynamic sharing* of CPU >> cores. >> >> It says "when the application is not running tasks on a machine, other >> applications may run tasks on those cores". >> But my applications are short lived (seconds to minutes), and they read a >> large dataset, process it, and write the results. They are also IO-bound, >> meaning most of the time is spent reading input data (from S3) and writing >> the results back. >> >> Is it possible to divide the resources between them, according to how >> many are trying to run at the same time? >> So for example if I have 12 cores - if one job is scheduled, it will get >> 12 cores, but if 3 are scheduled, then each one will get 4 cores and then >> will all start. >> >> Thanks! >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das >> wrote: >> >>> Have a look at scheduling pools >>> <https://spark.apache.org/docs/latest/job-scheduling.html>. If you want >>> more sophisticated resource allocation, then you are better of to use >>> cluster managers like mesos or yarn >>> >>> Thanks >>> Best Regards >>> >>> On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman wrote: >>> >>>> Hello, >>>> >>>> I have a Spark 1.1.0 standalone cluster, with several nodes, and >>>> several jobs (applications) being scheduled at the same time. >>>> By default, each Spark job takes up all available CPUs. >>>> This way, when more than one job is scheduled, all but the first are >>>> stuck in "WAITING". >>>> On the other hand, if I tell each job to initially limit itself to a >>>> fixed number of CPUs, and that job runs by itself, the cluster is >>>> under-utilized and the job runs longer than it could have if it took all >>>> the available resources. >>>> >>>> - How to give the tasks a more fair resource division, which lets many >>>> jobs run together, and together lets them use all the available resources? >>>> - How do you divide resources between applications on your usecase? >>>> >>>> P.S. I started reading about Mesos but couldn't figure out if/how it >>>> could solve the described issue. >>>> >>>> Thanks! >>>> >>>> *Romi Kuntsman*, *Big Data Engineer* >>>> http://www.totango.com >>>> >>> >>> >> >
Re: Dynamically switching Nr of allocated core
I didn't notice your message and asked about the same question, in the thread with the title "Spark job resource allocation best practices". Adding specific case to your example: 1 - There are 12 cores available in the cluster 2 - I start app B with all cores - gets 12 3 - I start app A - it needs just 2 cores (as you said it will get even when there are 12 available), but gets nothing 4 - Until I stop app B, app A is stuck waiting, instead of app B freeing 2 cores and dropping to 10 cores. *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 3:17 PM, RodrigoB wrote: > Hi all, > > I can't seem to find a clear answer on the documentation. > > Does the standalone cluster support dynamic assigment of nr of allocated > cores to an application once another app stops? > I'm aware that we can have core sharding if we use Mesos between active > applications depending on the nr of parallel tasks I believe my question is > slightly simpler. > > For example: > 1 - There are 12 cores available in the cluster > 2 - I start app A with 2 cores - gets 2 > 3 - I start app B - gets remaining 10 > 4 - If I stop app A, app B *does not* get the now available remaining 2 > cores. > > Should I expect Mesos to have this scenario working? > > Also, the same question applies to when we add more cores to a cluster. > Let's say ideally I want 12 cores for my app, although there are only 10. > As > I add more workers, they should get assigned to my app dynamically. I > haven't tested this in a while but I think the app will not even start and > complain about not enough resources... > > Would very much appreciate any knowledge share on this! > > tnks, > Rod > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Dynamically-switching-Nr-of-allocated-core-tp17955.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark job resource allocation best practices
So, as said there, static partitioning is used in "Spark’s standalone and YARN modes, as well as the coarse-grained Mesos mode". That leaves us only with Mesos, where there is *dynamic sharing* of CPU cores. It says "when the application is not running tasks on a machine, other applications may run tasks on those cores". But my applications are short lived (seconds to minutes), and they read a large dataset, process it, and write the results. They are also IO-bound, meaning most of the time is spent reading input data (from S3) and writing the results back. Is it possible to divide the resources between them, according to how many are trying to run at the same time? So for example if I have 12 cores - if one job is scheduled, it will get 12 cores, but if 3 are scheduled, then each one will get 4 cores and then will all start. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das wrote: > Have a look at scheduling pools > <https://spark.apache.org/docs/latest/job-scheduling.html>. If you want > more sophisticated resource allocation, then you are better of to use > cluster managers like mesos or yarn > > Thanks > Best Regards > > On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman wrote: > >> Hello, >> >> I have a Spark 1.1.0 standalone cluster, with several nodes, and several >> jobs (applications) being scheduled at the same time. >> By default, each Spark job takes up all available CPUs. >> This way, when more than one job is scheduled, all but the first are >> stuck in "WAITING". >> On the other hand, if I tell each job to initially limit itself to a >> fixed number of CPUs, and that job runs by itself, the cluster is >> under-utilized and the job runs longer than it could have if it took all >> the available resources. >> >> - How to give the tasks a more fair resource division, which lets many >> jobs run together, and together lets them use all the available resources? >> - How do you divide resources between applications on your usecase? >> >> P.S. I started reading about Mesos but couldn't figure out if/how it >> could solve the described issue. >> >> Thanks! >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> > >
Spark job resource allocation best practices
Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in "WAITING". On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Workers disconnected from master sometimes and never reconnect back
Hi all, Regarding a post here a few months ago http://apache-spark-user-list.1001560.n3.nabble.com/Workers-disconnected-from-master-sometimes-and-never-reconnect-back-tp6240.html Is there an answer to this? I saw workers being still active and not reconnecting after they lost connection to the master. Using Spark 1.1.0. What if a master server is restarted, should worker retry to register on it? Greetings, -- *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com Join the Customer Success Manifesto <http://youtu.be/XvFi2Wh6wgU>