Re: S3n, parallelism, partitions
This will also depend on the file format you are using. A word of advice: you would be much better off with the s3a file system. As I found out recently the hard way, s3n has some issues with reading through entire files even when looking for headers. On Mon, Aug 17, 2015 at 2:10 AM, Akhil Das ak...@sigmoidanalytics.com wrote: s3n underneath uses the hadoop api, so i guess it would partition according to your hadoop configuration (128MB per partition by default) Thanks Best Regards On Mon, Aug 17, 2015 at 2:29 PM, matd matd...@gmail.com wrote: Hello, I would like to understand how the work is parallelized accross a Spark cluster (and what is left to the driver) when I read several files from a single folder in s3 s3n://bucket_xyz/some_folder_having_many_files_in_it/ How files (or file parts) are mapped to partitions ? Thanks Mathieu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3n-parallelism-partitions-tp24293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing S3 files with s3n://
Hi Jerry, Akhil, Thanks your your help. With s3n, the entire file is downloaded even while just creating the RDD with sqlContext.read.parquet(). It seems like even just opening and closing the InputStream causes the entire data to get fetched. As it turned out, I was able to use s3a and avoid this problem. I was under the impression that s3a was only meant for using EMRFS, where the metadata of the FS is kept separately. This is not true; s3a maps object keys directly to file names and directories. On Sun, Aug 9, 2015 at 6:01 AM, Jerry Lam chiling...@gmail.com wrote: Hi Akshat, Is there a particular reason you don't use s3a? From my experience,s3a performs much better than the rest. I believe the inefficiency is from the implementation of the s3 interface. Best Regards, Jerry Sent from my iPhone On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.com wrote: Depends on which operation you are doing, If you are doing a .count() on a parquet, it might not download the entire file i think, but if you do a .count() on a normal text file it might pull the entire file. Thanks Best Regards On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote: Hi, I've been trying to track down some problems with Spark reads being very slow with s3n:// URIs (NativeS3FileSystem). After some digging around, I realized that this file system implementation fetches the entire file, which isn't really a Spark problem, but it really slows down things when trying to just read headers from a Parquet file or just creating partitions in the RDD. Is this something that others have observed before, or am I doing something wrong? Thanks, Akshat
Accessing S3 files with s3n://
Hi, I've been trying to track down some problems with Spark reads being very slow with s3n:// URIs (NativeS3FileSystem). After some digging around, I realized that this file system implementation fetches the entire file, which isn't really a Spark problem, but it really slows down things when trying to just read headers from a Parquet file or just creating partitions in the RDD. Is this something that others have observed before, or am I doing something wrong? Thanks, Akshat
Re: Kryo serialization of classes in additional jars
I cherry-picked this commit into my local 1.2 branch. It fixed the problem with setting spark.serializer, but I get a similar problem with spark.closure.serializer org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:100) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:152) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:114) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:73) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ClassNotFoundException: com.foo.Foo at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:93) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:93) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:93) ... 21 more On Mon, May 4, 2015 at 5:43 PM, Akshat Aranya aara...@gmail.com wrote: Actually, after some digging, I did find a JIRA for it: SPARK-5470. The fix for this has gone into master, but it isn't in 1.2. On Mon, May 4, 2015 at 2:47 PM, Imran Rashid iras...@cloudera.com wrote: Oh, this seems like a real pain. You should file a jira, I didn't see an open issue -- if nothing else just to document the issue. As you've noted, the problem is that the serializer is created immediately in the executors, right when the SparkEnv is created, but the other jars aren't downloaded later. I think you could workaround with some combination of pushing the jars to the cluster manually, and then using spark.executor.extraClassPath On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya aara...@gmail.com wrote: Hi, Is it possible to register kryo serialization for classes contained in jars that are added with spark.jars? In my experiment it doesn't seem to work, likely because the class registration happens before the jar is shipped to the executor and added to the classloader. Here's the general idea of what I want to do: val sparkConf = new SparkConf(true) .set(spark.jars, foo.jar) .setAppName(foo) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register classes contained in foo.jar sparkConf.registerKryoClasses(Array( classOf[com.foo.Foo], classOf[com.foo.Bar]))
Re: Kryo serialization of classes in additional jars
Actually, after some digging, I did find a JIRA for it: SPARK-5470. The fix for this has gone into master, but it isn't in 1.2. On Mon, May 4, 2015 at 2:47 PM, Imran Rashid iras...@cloudera.com wrote: Oh, this seems like a real pain. You should file a jira, I didn't see an open issue -- if nothing else just to document the issue. As you've noted, the problem is that the serializer is created immediately in the executors, right when the SparkEnv is created, but the other jars aren't downloaded later. I think you could workaround with some combination of pushing the jars to the cluster manually, and then using spark.executor.extraClassPath On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya aara...@gmail.com wrote: Hi, Is it possible to register kryo serialization for classes contained in jars that are added with spark.jars? In my experiment it doesn't seem to work, likely because the class registration happens before the jar is shipped to the executor and added to the classloader. Here's the general idea of what I want to do: val sparkConf = new SparkConf(true) .set(spark.jars, foo.jar) .setAppName(foo) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register classes contained in foo.jar sparkConf.registerKryoClasses(Array( classOf[com.foo.Foo], classOf[com.foo.Bar])) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ClassNotFoundException for Kryo serialization
Now I am running up against some other problem while trying to schedule tasks: 15/05/01 22:32:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2419) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1380) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) I verified that the same configuration works without using Kryo serialization. On Fri, May 1, 2015 at 9:44 AM, Akshat Aranya aara...@gmail.com wrote: I cherry-picked the fix for SPARK-5470 and the problem has gone away. On Fri, May 1, 2015 at 9:15 AM, Akshat Aranya aara...@gmail.com wrote: Yes, this class is present in the jar that was loaded in the classpath of the executor Java process -- it wasn't even lazily added as a part of the task execution. Schema$MyRow is a protobuf-generated class. After doing some digging around, I think I might be hitting up against SPARK-5470, the fix for which hasn't been merged into 1.2, as far as I can tell. On Fri, May 1, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote: bq. Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow So the above class is in the jar which was in the classpath ? Can you tell us a bit more about Schema$MyRow ? On Fri, May 1, 2015 at 8:05 AM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm getting a ClassNotFoundException at the executor when trying to register a class for Kryo serialization: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:243) at org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:254) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:182) at org.apache.spark.executor.Executor.init(Executor.scala:87) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:61) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.spark.SparkException: Failed to load class to register with Kryo at org.apache.spark.serializer.KryoSerializer$$anonfun$2
ClassNotFoundException for Kryo serialization
Hi, I'm getting a ClassNotFoundException at the executor when trying to register a class for Kryo serialization: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:243) at org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:254) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:182) at org.apache.spark.executor.Executor.init(Executor.scala:87) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:61) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.spark.SparkException: Failed to load class to register with Kryo at org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:66) at org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:61) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.serializer.KryoSerializer.init(KryoSerializer.scala:61) ... 28 more Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190) at org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:63) I have verified that when the executor process is launched, my jar is in the classpath of the command line of the executor. I expect the class to be found by the default classloader being used at KryoSerializer.scala:63 Any ideas?
Re: ClassNotFoundException for Kryo serialization
Yes, this class is present in the jar that was loaded in the classpath of the executor Java process -- it wasn't even lazily added as a part of the task execution. Schema$MyRow is a protobuf-generated class. After doing some digging around, I think I might be hitting up against SPARK-5470, the fix for which hasn't been merged into 1.2, as far as I can tell. On Fri, May 1, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote: bq. Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow So the above class is in the jar which was in the classpath ? Can you tell us a bit more about Schema$MyRow ? On Fri, May 1, 2015 at 8:05 AM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm getting a ClassNotFoundException at the executor when trying to register a class for Kryo serialization: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:243) at org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:254) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:182) at org.apache.spark.executor.Executor.init(Executor.scala:87) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:61) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.spark.SparkException: Failed to load class to register with Kryo at org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:66) at org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:61) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.serializer.KryoSerializer.init(KryoSerializer.scala:61) ... 28 more Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190) at org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:63) I have verified that when the executor process is launched, my jar is in the classpath
Re: ClassNotFoundException for Kryo serialization
I cherry-picked the fix for SPARK-5470 and the problem has gone away. On Fri, May 1, 2015 at 9:15 AM, Akshat Aranya aara...@gmail.com wrote: Yes, this class is present in the jar that was loaded in the classpath of the executor Java process -- it wasn't even lazily added as a part of the task execution. Schema$MyRow is a protobuf-generated class. After doing some digging around, I think I might be hitting up against SPARK-5470, the fix for which hasn't been merged into 1.2, as far as I can tell. On Fri, May 1, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote: bq. Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow So the above class is in the jar which was in the classpath ? Can you tell us a bit more about Schema$MyRow ? On Fri, May 1, 2015 at 8:05 AM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm getting a ClassNotFoundException at the executor when trying to register a class for Kryo serialization: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:243) at org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:254) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:182) at org.apache.spark.executor.Executor.init(Executor.scala:87) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:61) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.spark.SparkException: Failed to load class to register with Kryo at org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:66) at org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:61) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.serializer.KryoSerializer.init(KryoSerializer.scala:61) ... 28 more Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190
Re: spark with standalone HBase
Looking at your classpath, it looks like you've compiled Spark yourself. Depending on which version of Hadoop you've compiled against (looks like it's Hadoop 2.2 in your case), Spark will have its own version of protobuf. You should try by making sure both your HBase and Spark are compiled against the same version of Hadoop. On Thu, Apr 30, 2015 at 6:54 AM, Ted Yu yuzhih...@gmail.com wrote: The error indicates incompatible protobuf versions. Please take a look at 4.1.1 under http://hbase.apache.org/book.html#basic.prerequisites Cheers On Thu, Apr 30, 2015 at 3:49 AM, Saurabh Gupta saurabh.gu...@semusi.com wrote: Now able to solve the issue by setting SparkConf sconf = *new* SparkConf().setAppName(“App).setMaster(local) and conf.set(“zookeeper.znode.parent”, “/hbase-unsecure”) Standalone hbase has a table 'test' hbase(main):001:0 scan 'test' ROW COLUMN+CELL row1column=cf:a, timestamp=1430234895637, value=value1 row2column=cf:b, timestamp=1430234907537, value=value2 row3column=cf:c, timestamp=1430234918284, value=value3 Now facing this issue: ERROR TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:416) at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:393) at org.apache.hadoop.hbase.client.HConnectionManager.getConnection(HConnectionManager.java:274) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:194) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:156) at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:101) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1632) at org.apache.spark.rdd.RDD.count(RDD.scala:1012) at org.apache.spark.examples.HBaseTest$.main(HBaseTest.scala:58) at org.apache.spark.examples.HBaseTest.main(HBaseTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:607) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:190) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:414) ... 23 more Caused by: java.lang.VerifyError: class org.apache.hadoop.hbase.protobuf.generated.ClientProtos$Result overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.clinit(ProtobufUtil.java:176) at org.apache.hadoop.hbase.ClusterId.parseFrom(ClusterId.java:64) at org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:69) at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:83) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.retrieveClusterId(HConnectionManager.java:857) at
Kryo serialization of classes in additional jars
Hi, Is it possible to register kryo serialization for classes contained in jars that are added with spark.jars? In my experiment it doesn't seem to work, likely because the class registration happens before the jar is shipped to the executor and added to the classloader. Here's the general idea of what I want to do: val sparkConf = new SparkConf(true) .set(spark.jars, foo.jar) .setAppName(foo) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register classes contained in foo.jar sparkConf.registerKryoClasses(Array( classOf[com.foo.Foo], classOf[com.foo.Bar]))
When are TaskCompletionListeners called?
Hi, I'm trying to figure out when TaskCompletionListeners are called -- are they called at the end of the RDD's compute() method, or after the iteration through the iterator of the compute() method is completed. To put it another way, is this OK: class DatabaseRDD[T] extends RDD[T] { def compute(...): Iterator[T] = { val session = // acquire a DB session TaskContext.get.addTaskCompletionListener { (context) = session.release() } val iterator = session.query(...) iterator } }
Re: Spark SQL code generation
Thanks for the info, Michael. Is there a reason to do so, as opposed to shipping out the bytecode and loading it via the classloader? Is it more complex? I can imagine caching to be effective for repeated queries, but when the subsequent queries are different. On Mon, Apr 6, 2015 at 2:41 PM, Michael Armbrust mich...@databricks.com wrote: It is generated and cached on each of the executors. On Mon, Apr 6, 2015 at 2:32 PM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm curious as to how Spark does code generation for SQL queries. Following through the code, I saw that an expression is parsed and compiled into a class using Scala reflection toolbox. However, it's unclear to me whether the actual byte code is generated on the master or on each of the executors. If it generated on the master, how is the byte code shipped out to the executors? Thanks, Akshat https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html
Spark SQL code generation
Hi, I'm curious as to how Spark does code generation for SQL queries. Following through the code, I saw that an expression is parsed and compiled into a class using Scala reflection toolbox. However, it's unclear to me whether the actual byte code is generated on the master or on each of the executors. If it generated on the master, how is the byte code shipped out to the executors? Thanks, Akshat https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html
Re: Getting to proto buff classes in Spark Context
My guess would be that you are packaging too many things in your job, which is causing problems with the classpath. When your jar goes in first, you get the correct version of protobuf, but some other version of something else. When your jar goes in later, other things work, but protobuf breaks. This is just a guess though; take a look at what you're packaging in your jar and look for things that Spark or Kafka could also be using. On Thu, Feb 26, 2015 at 10:06 AM, necro351 . necro...@gmail.com wrote: Hello everyone, We are trying to decode a message inside a Spark job that we receive from Kafka. The message is encoded using Proto Buff. The problem is when decoding we get class-not-found exceptions. We have tried remedies we found online in Stack Exchange and mail list archives but nothing seems to work. (This question is a re-ask, but we really cannot figure this one out.) We created a standalone repository with a very simple Spark job that exhibits the above issues. The spark job reads the messages from the FS, decodes them, and prints them. Its easy to checkout and try to see the exception yourself: just uncomment the code that prints the messages from within the RDD. The only sources are the generated Proto Buff java sources and a small Spark Job that decodes a message. I'd appreciate if anyone could take a look. https://github.com/vibhav/spark-protobuf We tried a couple remedies already. Setting spark.files.userClassPathFirst didn't fix the problem for us. I am not very familiar with the Spark and Scala environment, so please correct any incorrect assumptions or statements I make. However, I don't believe this to be a classpath visibility issue. I wrote a small helper method to print out the classpath from both the driver and worker, and the output is identical. (I'm printing out System.getProperty(java.class.path) -- is there a better way to do this or check the class path?). You can print out the class paths the same way we are from the example project above. Furthermore, userClassPathFirst seems to have a detrimental effect on otherwise working code, which I cannot explain or do not understand. For example, I created a trivial RDD as such: val l = List(1, 2, 3) sc.makeRDD(l).foreach((x: Int) = { println(x.toString) }) With userClassPathFirst set, I encounter a java.lang.ClassCastException trying to execute that code. Is that to be expected? You can re-create this issue by commenting out the block of code that tries to print the above in the example project we linked to above. We also tried dynamically adding the jar with .addJar to the Spark Context but this seemed to have no effect. Thanks in advance for any help y'all can provide.
Missing tasks
I am seeing a problem with a Spark job in standalone mode. Spark master's web interface shows a task RUNNING on a particular executor, but the logs of the executor do not show the task being ever assigned to it, that is, such a line is missing from the log: 15/02/25 16:53:36 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 27. The missing task wasn't the first task executing on the executor. Also, subsequent tasks are running on other executor threads on this executor. Any good way to figure out what happened here? -Akshat
Number of parallel tasks
I have Spark running in standalone mode with 4 executors, and each executor with 5 cores each (spark.executor.cores=5). However, when I'm processing an RDD with ~90,000 partitions, I only get 4 parallel tasks. Shouldn't I be getting 4x5=20 parallel task executions?
Standalone Spark program
Hi, I am building a Spark-based service which requires initialization of a SparkContext in a main(): def main(args: Array[String]) { val conf = new SparkConf(false) .setMaster(spark://foo.example.com:7077) .setAppName(foobar) val sc = new SparkContext(conf) val rdd = sc.parallelize(0 until 255) val res = rdd.mapPartitions(it = it).take(1) println(sres=$res) sc.stop() } This code works fine via REPL, but not as a standalone program; it causes a ClassNotFoundException. This has me confused about how code is shipped out to executors. When using via REPL, does the mapPartitions closure, it=it, get sent out when the REPL statement is executed? When this code is run as a standalone program (not via spark-submit), is the compiled code expected to be present at the the executor? Thanks, Akshat
Stateful mapPartitions
Is it possible to have some state across multiple calls to mapPartitions on each partition, for instance, if I want to keep a database connection open?
Re: Stateful mapPartitions
I want to have a database connection per partition of the RDD, and then reuse that connection whenever mapPartitions is called, which results in compute being called on the partition. On Thu, Dec 4, 2014 at 11:07 AM, Paolo Platter paolo.plat...@agilelab.it wrote: Could you provide some further details ? What do you nerd to do with db cpnnection? Paolo Inviata dal mio Windows Phone -- Da: Akshat Aranya aara...@gmail.com Inviato: 04/12/2014 18:57 A: user@spark.apache.org Oggetto: Stateful mapPartitions Is it possible to have some state across multiple calls to mapPartitions on each partition, for instance, if I want to keep a database connection open?
Executor failover
Hi, I have a question regarding failure of executors: how does Spark reassign partitions or tasks when executors fail? Is it necessary that new executors have the same executor IDs as the ones that were lost, or are these IDs irrelevant for failover?
Re: advantages of SparkSQL?
Parquet is a column-oriented format, which means that you need to read in less data from the file system if you're only interested in a subset of your columns. Also, Parquet pushes down selection predicates, which can eliminate needless deserialization of rows that don't match a selection criterion. Other than that, you would also get compression, and likely save processor cycles when parsing lines from text files. On Mon, Nov 24, 2014 at 8:20 AM, mrm ma...@skimlinks.com wrote: Hi, Is there any advantage to storing data as a parquet format, loading it using the sparkSQL context, but never registering as a table/using sql on it? Something like: Something like: data = sqc.parquetFile(path) results = data.map(lambda x: applyfunc(x.field)) Is this faster/more optimised than having the data stored as a text file and using Spark (non-SQL) to process it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/advantages-of-SparkSQL-tp19661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and Play
Hi, Sorry if this has been asked before; I didn't find a satisfactory answer when searching. How can I integrate a Play application with Spark? I'm getting into issues of akka-actor versions. Play 2.2.x uses akka-actor 2.0, whereas Play 2.3.x uses akka-actor 2.3.4, neither of which work fine with Spark 1.1.0. Is there something I should do with libraryDependencies in my build.sbt to make it work? Thanks, Akshat
Mapping SchemaRDD/Row to JSON
Hi, Does there exist a way to serialize Row objects to JSON. In the absence of such a way, is the right way to go: * get hold of schema using SchemaRDD.schema * Iterate through each individual Row as a Seq and use the schema to convert values in the row to JSON types. Thanks, Akshat
Re: Spark as key/value store?
Spark, in general, is good for iterating through an entire dataset again and again. All operations are expressed in terms of iteration through all the records of at least one partition. You may want to look at IndexedRDD ( https://issues.apache.org/jira/browse/SPARK-2365) that aims to improve point queries. In general though, Spark is unlikely to outperform KV stores because of the nature of scheduling a job for every operation. On Wed, Oct 22, 2014 at 7:51 AM, Hajime Takase placeofnomemor...@gmail.com wrote: Hi, Is it possible to use Spark as clustered key/value store ( say, like redis-cluster or hazelcast)?Will it out perform in write/read or other operation? My main urge is to use same RDD from several different SparkContext without saving to disk or using spark-job server,but I'm curious if someone has already tried using Spark like key/value store. Thanks, Hajime
Primitive arrays in Spark
This is as much of a Scala question as a Spark question I have an RDD: val rdd1: RDD[(Long, Array[Long])] This RDD has duplicate keys that I can collapse such val rdd2: RDD[(Long, Array[Long])] = rdd1.reduceByKey((a,b) = a++b) If I start with an Array of primitive longs in rdd1, will rdd2 also have Arrays of primitive longs? I suspect, based on my memory usage, that this is not the case. Also, would it be more efficient to do this: val rdd1: RDD[(Long, ArrayBuffer[Long])] and then val rdd2: RDD[(Long, Array[Long])] = rdd1.reduceByKey((a,b) = a++b).map(_.toArray)
Attaching schema to RDD created from Parquet file
Hi, How can I convert an RDD loaded from a Parquet file into its original type: case class Person(name: String, age: Int) val rdd: RDD[Person] = ... rdd.saveAsParquetFile(people) val rdd2: sqlContext.parquetFile(people) How can I map rdd2 back into an RDD[Person]? All of the examples just show how to use the RDD loaded from Parquet using SQL.
Re: bug with MapPartitions?
There seems to be some problem with what gets captured in the closure that's passed into the mapPartitions (myfunc in your case). I've had a similar problem before: http://apache-spark-user-list.1001560.n3.nabble.com/TaskNotSerializableException-when-running-through-Spark-shell-td16574.html Try putting your myFunc in an object: object Mapper { def myFunc = ... } val r = sc.parallelize(c).mapPartitions(Mapper.myFunc).collect() On Fri, Oct 17, 2014 at 7:33 AM, davidkl davidkl...@hotmail.com wrote: Hello, Maybe there is something I do not get to understand, but I believe this code should not throw any serialization error when I run this in the spark shell. Using similar code with map instead of mapPartitions works just fine. import java.io.BufferedInputStream import java.io.FileInputStream import com.testing.DataPacket val inStream = new BufferedInputStream(new FileInputStream(inputFile)) val p = new DataPacket(inStream) val c = Array(p) val myfunc[T](iter: Iterator[T]) : Iterator[String] = { var res = List[String]() while (iter.hasNext) { val cur = iter.next; res .::= () } res.iterator } var r = sc.parallelize(c).mapPartitions(myfunc).collect() This throws the following: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) ... ... Caused by: java.io.NotSerializableException: java.io.BufferedInputStream at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ... ... Why is this code failing? The constructor of DataPacket just reads data, but does not keep any reference to the BufferedInputStream. Note that this is not the real code, but a simplification while trying to isolate the cause of the error I get. Using map on this instead of MapPartitions works fine. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Larger heap leads to perf degradation due to GC
I just want to pitch in and say that I ran into the same problem with running with 64GB executors. For example, some of the tasks take 5 minutes to execute, out of which 4 minutes are spent in GC. I'll try out smaller executors. On Mon, Oct 6, 2014 at 6:35 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, The other option to consider is using G1 GC, which should behave better with large heaps. But pointers are not compressed in heaps 32 GB in size, so you may be better off staying under 32 GB. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Oct 6, 2014 at 8:08 PM, Mingyu Kim m...@palantir.com wrote: Ok, cool. This seems to be general issues in JVM with very large heaps. I agree that the best workaround would be to keep the heap size below 32GB. Thanks guys! Mingyu From: Arun Ahuja aahuj...@gmail.com Date: Monday, October 6, 2014 at 7:50 AM To: Andrew Ash and...@andrewash.com Cc: Mingyu Kim m...@palantir.com, user@spark.apache.org user@spark.apache.org, Dennis Lawler dlaw...@palantir.com Subject: Re: Larger heap leads to perf degradation due to GC We have used the strategy that you suggested, Andrew - using many workers per machine and keeping the heaps small ( 20gb). Using a large heap resulted in workers hanging or not responding (leading to timeouts). The same dataset/job for us will fail (most often due to akka disassociated or fetch failures errors) with 10 cores / 100 executors, 60 gb per executor while succceed with 1 core / 1000 executors / 6gb per executor. When the job does succceed with more cores per executor and larger heap it is usually much slower than the smaller executors (the same 8-10 min job taking 15-20 min to complete) The unfortunate downside of this has been, we have had some large broadcast variables which may not fit into memory (and unnecessarily duplicated) when using the smaller executors. Most of this is anecdotal but for the most part we have had more success and consistency with more executors with smaller memory requirements. On Sun, Oct 5, 2014 at 7:20 PM, Andrew Ash and...@andrewash.com wrote: Hi Mingyu, Maybe we should be limiting our heaps to 32GB max and running multiple workers per machine to avoid large GC issues. For a 128GB memory, 32 core machine, this could look like: SPARK_WORKER_INSTANCES=4 SPARK_WORKER_MEMORY=32 SPARK_WORKER_CORES=8 Are people running with large (32GB+) executor heaps in production? I'd be curious to hear if so. Cheers! Andrew On Thu, Oct 2, 2014 at 1:30 PM, Mingyu Kim m...@palantir.com wrote: This issue definitely needs more investigation, but I just wanted to quickly check if anyone has run into this problem or has general guidance around it. We’ve seen a performance degradation with a large heap on a simple map task (I.e. No shuffle). We’ve seen the slowness starting around from 50GB heap. (I.e. spark.executor.memoty=50g) And, when we checked the CPU usage, there were just a lot of GCs going on. Has anyone seen a similar problem? Thanks, Mingyu
TaskNotSerializableException when running through Spark shell
Hi, Can anyone explain how things get captured in a closure when runing through the REPL. For example: def foo(..) = { .. } rdd.map(foo) sometimes complains about classes not being serializable that are completely unrelated to foo. This happens even when I write it such: object Foo { def foo(..) = { .. } } rdd.map(Foo.foo) It also doesn't happen all the time.
One pass compute() to produce multiple RDDs
Hi, Is there a good way to materialize derivate RDDs from say, a HadoopRDD while reading in the data only once. One way to do so would be to cache the HadoopRDD and then create derivative RDDs, but that would require enough RAM to cache the HadoopRDD which is not an option in my case. Thanks, Akshat
Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?
Using a var for RDDs in this way is not going to work. In this example, tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after that, you change what tx2 means, so you would end up having a circular dependency. On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: My job is not being fault-tolerant (e.g., when there's a fetch failure or something). The lineage of RDDs are constantly updated every iteration. However, I think that when there's a failure, the lineage information is not being correctly reapplied. It goes something like this: val rawRDD = read(...) val repartRDD = rawRDD.repartition(X) val tx1 = repartRDD.map(...) var tx2 = tx1.map(...) while (...) { tx2 = tx1.zip(tx2).map(...) } Is there any way to monitor RDD's lineage, maybe even including? I want to make sure that there's no unexpected things happening.
Relation between worker memory and executor memory in standalone mode
Hi, What's the relationship between Spark worker and executor memory settings in standalone mode? Do they work independently or does the worker cap executor memory? Also, is the number of concurrent executors per worker capped by the number of CPU cores configured for the worker?
Re: Relation between worker memory and executor memory in standalone mode
On Wed, Oct 1, 2014 at 11:33 AM, Akshat Aranya aara...@gmail.com wrote: On Wed, Oct 1, 2014 at 11:00 AM, Boromir Widas vcsub...@gmail.com wrote: 1. worker memory caps executor. 2. With default config, every job gets one executor per worker. This executor runs with all cores available to the worker. By the job do you mean one SparkContext or one stage execution within a program? Does that also mean that two concurrent jobs will get one executor each at the same time? Experimenting with this some more, I figured out that an executor takes away spark.executor.memory amount of memory from the configured worker memory. It also takes up all the cores, so even if there is still some memory left, there are no cores left for starting another executor. Is my assessment correct? Is there no way to configure the number of cores that an executor can use? On Wed, Oct 1, 2014 at 11:04 AM, Akshat Aranya aara...@gmail.com wrote: Hi, What's the relationship between Spark worker and executor memory settings in standalone mode? Do they work independently or does the worker cap executor memory? Also, is the number of concurrent executors per worker capped by the number of CPU cores configured for the worker?
Determining number of executors within RDD
Hi, I want implement an RDD wherein the decision of number of partitions is based on the number of executors that have been set up. Is there some way I can determine the number of executors within the getPartitions() call?
Re: partitioned groupBy
Patrick, If I understand this correctly, I won't be able to do this in the closure provided to mapPartitions() because that's going to be stateless, in the sense that a hash map that I create within the closure would only be useful for one call of MapPartitionsRDD.compute(). I guess I would need to override mapPartitions() directly within my RDD. Right? On Tue, Sep 16, 2014 at 4:57 PM, Patrick Wendell pwend...@gmail.com wrote: If each partition can fit in memory, you can do this using mapPartitions and then building an inverse mapping within each partition. You'd need to construct a hash map within each partition yourself. On Tue, Sep 16, 2014 at 4:27 PM, Akshat Aranya aara...@gmail.com wrote: I have a use case where my RDD is set up such: Partition 0: K1 - [V1, V2] K2 - [V2] Partition 1: K3 - [V1] K4 - [V3] I want to invert this RDD, but only within a partition, so that the operation does not require a shuffle. It doesn't matter if the partitions of the inverted RDD have non unique keys across the partitions, for example: Partition 0: V1 - [K1] V2 - [K1, K2] Partition 1: V1 - [K3] V3 - [K4] Is there a way to do only a per-partition groupBy, instead of shuffling the entire data?
Indexed RDD
Hi, I'm trying to implement a custom RDD that essentially works as a distributed hash table, i.e. the key space is split up into partitions and within a partition, an element can be looked up efficiently by the key. However, the RDD lookup() function (in PairRDDFunctions) is implemented in a way iterate through all elements of a partition and find the matching ones. Is there a better way to do what I want to do, short of just implementing new methods on the custom RDD? Thanks, Akshat
partitioned groupBy
I have a use case where my RDD is set up such: Partition 0: K1 - [V1, V2] K2 - [V2] Partition 1: K3 - [V1] K4 - [V3] I want to invert this RDD, but only within a partition, so that the operation does not require a shuffle. It doesn't matter if the partitions of the inverted RDD have non unique keys across the partitions, for example: Partition 0: V1 - [K1] V2 - [K1, K2] Partition 1: V1 - [K3] V3 - [K4] Is there a way to do only a per-partition groupBy, instead of shuffling the entire data?