Extremely slow shuffle writes and large job time fluxuations
I'm running into an issue with a pyspark job where I'm sometimes seeing extremely variable job times (20min to 2hr) and very long shuffle times (e.g. ~2 minutes for 18KB/86 records). Cluster set up is Amazon EMR 4.4.0, Spark 1.6.0, an m4.2xl driver and a single m4.10xlarge (40 vCPU, 160GB) executor with the following params: --conf spark.driver.memory=10G --conf spark.default.parallelism=160 --conf spark.driver.maxResultSize=4g --num-executors 2 --executor-cores 20 --executor-memory 67G What's odd is that sometimes the job will run in 20 minutes and sometimes it will take 2 hours - both jobs with the same data. I'm using RDDs (not DataFrames). There's plenty of RAM, I've looked at the GC logs (using CMS) and they look fine. The job reads some data from files, does some maps/filters/joins/etc; nothing too special. The only thing I've noticed that looks odd is that the slow instances of the job have unusually long Shuffle Write times for some tasks. For example, a .join operation has ~30 tasks out of 320 that take 2.5 minutes, GC time of 0.1 seconds, Shuffle Read Size / Records of 12KB/30, and, most interestingly, Write Time of 2.5 minutes for Shuffle Write Size / Records of 18KB/86 records. When looking at the event time line for the stage it's almost all yellow (Shuffle Write). We've been running this job on a difference EMR cluster topology (12 m3.2xlarge's) and have not seen the slow down described above. We've only observed it on the m4.10xl machine. It might be worth mentioning again that this is pyspark and no DataFrames (just RDDs). When I run 'top' I sometimes see lots (e.g. 60 or 70) python processes on the executor (I assume one per partition being processed?). It seems like this has something to do with the single m4.10xl set up, as we haven't seen this behavior on the 12 m3.2xl cluster. What I really don't understand is why the job seems to run fine (20 minutes) for a while, and then (for the same data) takes so much longer (2 hours), and with such long shuffle write times.
Re: About memory leak in spark 1.4.1
I'm seeing a similar (same?) problem on Spark 1.4.1 running on Yarn (Amazon EMR, Java 8). I'm running a Spark Streaming app 24/7 and system memory eventually gets exhausted after about 3 days and the JVM process dies with: # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 12288 bytes for committing reserved memory. # An error report file with more information is saved as: # /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1442933070871_0002/container_1442933070871_0002_01_02/hs_err_pid19082.log [thread 139846843156224 also had an error] To reiterate what Sea said, the heap is fine, this is NOT a heap memory issue - I've monitored it with scripts and also observed it via VisualVm - this is an off heap issue. I ran pmap on the pid of CoarseGrainedExecutor, spaced about 5 hours apart, and saw several 64mb chunks of off heap memory allocated in that time: 7fccd400 65500K rw---[ anon ] 7fccd7ff7000 36K -[ anon ] 7fccd800 65528K rw---[ anon ] 7fccdbffe000 8K -[ anon ] 7fccdc00 65504K rw---[ anon ] 7fccdfff8000 32K -[ anon ] 7fcce000 65536K rw---[ anon ] 7fcce400 65508K rw---[ anon ] 7fcce7ff9000 28K -[ anon ] 7fcce800 65524K rw---[ anon ] 7fccebffd000 12K -[ anon ] 7fccec00 65532K rw---[ anon ] 7fcce000 4K -[ anon ] 7fccf000 65496K rw---[ anon ] 7fccf3ff6000 40K -[ anon ] 7fccf400 65496K rw---[ anon ] 7fccf7ff6000 40K -[ anon ] 7fccf800 65532K rw---[ anon ] 7fccfbfff000 4K -[ anon ] 7fccfc00 65520K rw---[ anon ] 7fccc000 16K -[ anon ] 7fcd 65508K rw---[ anon ] 7fcd03ff9000 28K -[ anon ] Over these 8 hours, total memory usage by the JVM (as reported by top) had grown ~786mb over 5 hours, or basically the sum of those 13 64mb chunks. I dumped the memory from /proc/pid/, and was able to see a bunch of lines from the data files that my Spark job is processing, but I couldn't tell figure out what was actually creating these 64mb chunks. I thought it might be netty so I set spark.shuffle.io.preferDirectBufs to false, but that hasn't changed anything. The only thing I see in the config page regarding "64mb" is spark.kryoserializer.buffer.max, which defaults to 64mb. I'll try setting that to something different, but as far as I know, kryo is not doing anything off heap. Still wondering if this could be netty, or maybe something Akka is doing if it's using off heap mem? There were not ERROR messages in the executor (or driver's) logs during this time. Any help would be greatly appreciated. This issue continues to cause our streaming apps to die every few days ,which is...less than ideal! :) On Wed, Aug 5, 2015 at 9:10 AM, Sea <261810...@qq.com> wrote: > No one help me... I help myself, I split the cluster to two cluster > 1.4.1 and 1.3.0 > > > -- 原始邮件 -- > *发件人:* "Ted Yu";; > *发送时间:* 2015年8月4日(星期二) 晚上10:28 > *收件人:* "Igor Berman" ; > *抄送:* "Sea"<261810...@qq.com>; "Barak Gitsis" ; " > user@spark.apache.org" ; "rxin" ; > "joshrosen" ; "davies" ; > *主题:* Re: About memory leak in spark 1.4.1 > > w.r.t. spark.deploy.spreadOut , here is the scaladoc: > > // As a temporary workaround before better ways of configuring memory, > we allow users to set > // a flag that will perform round-robin scheduling across the nodes > (spreading out each app > // among all the nodes) instead of trying to consolidate each app onto a > small # of nodes. > private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", > true) > > Cheers > > On Tue, Aug 4, 2015 at 4:13 AM, Igor Berman wrote: > >> sorry, can't disclose info about my prod cluster >> >> nothing jumps into my mind regarding your config >> we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there >> is no documentation regarding this) >> >> If you are sure that you don't have memory leak in your business logic I >> would try to reset each property to default(or just remove it from your >> config) and try to run your job to see if it's not >> somehow connected >> >> my config(nothing special really) >> spark.shuffle.consolidateFiles true >> spark.speculation false >> spark.executor.extraJavaOptions -XX:+UseStringCache >> -XX:+UseCompressedStrings -XX:+PrintGC -XX:+PrintGCDetails >> -XX:+PrintGCTimeStamps -Xloggc:gc.log -verbose:gc >> spark.executor.logs.rolling.maxRetainedFiles 1000 >> spark.executor.logs.rolling.strategy time >> spark.worker.cleanup.enabled true >>
Re: Possible to combine all RDDs from a DStream batch into one?
I should note that the amount of data in each batch is very small, so I'm not concerned with performance implications of grouping into a single RDD. On Wed, Jul 15, 2015 at 9:58 PM, Jon Chase jon.ch...@gmail.com wrote: I'm currently doing something like this in my Spark Streaming program (Java): dStream.foreachRDD((rdd, batchTime) - { log.info(processing RDD from batch {}, batchTime); // my rdd processing code }); Instead of having my rdd processing code called once for each RDD in the batch, is it possible to essentially group all of the RDDs from the batch into a single RDD and single partition and therefore operate on all of the elements in the batch at once? My goal here is to do an operation exactly once for every batch. As I understand it, foreachRDD is going to do the operation once for each RDD in the batch, which is not what I want. I've looked at DStream.repartition(int), but the docs make it sound like it only changes the number of partitions in the batch's existing RDDs, not the number of RDDs.
Possible to combine all RDDs from a DStream batch into one?
I'm currently doing something like this in my Spark Streaming program (Java): dStream.foreachRDD((rdd, batchTime) - { log.info(processing RDD from batch {}, batchTime); // my rdd processing code }); Instead of having my rdd processing code called once for each RDD in the batch, is it possible to essentially group all of the RDDs from the batch into a single RDD and single partition and therefore operate on all of the elements in the batch at once? My goal here is to do an operation exactly once for every batch. As I understand it, foreachRDD is going to do the operation once for each RDD in the batch, which is not what I want. I've looked at DStream.repartition(int), but the docs make it sound like it only changes the number of partitions in the batch's existing RDDs, not the number of RDDs.
Re: RDD collect hangs on large input data
Zsolt - what version of Java are you running? On Mon, Mar 30, 2015 at 7:12 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: Thanks for your answer! I don't call .collect because I want to trigger the execution. I call it because I need the rdd on the driver. This is not a huge RDD and it's not larger than the one returned with 50GB input data. The end of the stack trace: The two IP's are the two worker nodes, I think they can't connect to the driver after they finished their part of the collect(). 15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage 1.0 (TID 1745). 1414 bytes result sent to driver 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200) called with curMem=405753, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as values in memory (estimated size 200.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called with curMem=405953, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as values in memory (estimated size 80.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_867 15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage 1.0 (TID 1740). 1440 bytes result sent to driver 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_868 15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage 1.0 (TID 1741). 1422 bytes result sent to driver 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:42026 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:41703 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in connection from /10.99.144.92:49021 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at
Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
https://issues.apache.org/jira/browse/SPARK-6570 I also left in the call to saveAsParquetFile(), as it produced a similar exception (though there was no use of explode there). On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian lian.cs@gmail.com wrote: This should be a bug in the Explode.eval(), which always assumes the underlying SQL array is represented by a Scala Seq. Would you mind to open a JIRA ticket for this? Thanks! Cheng On 3/27/15 7:00 PM, Jon Chase wrote: Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48
Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at
Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
Done. I also updated the name on the ticket to include both issues. Spark SQL arrays: explode() fails and cannot save array type to Parquet https://issues.apache.org/jira/browse/SPARK-6570 On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian lian.cs@gmail.com wrote: Forgot to mention that, would you mind to also provide the full stack trace of the exception thrown in the saveAsParquetFile call? Thanks! Cheng On 3/27/15 7:35 PM, Jon Chase wrote: https://issues.apache.org/jira/browse/SPARK-6570 I also left in the call to saveAsParquetFile(), as it produced a similar exception (though there was no use of explode there). On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian lian.cs@gmail.com wrote: This should be a bug in the Explode.eval(), which always assumes the underlying SQL array is represented by a Scala Seq. Would you mind to open a JIRA ticket for this? Thanks! Cheng On 3/27/15 7:00 PM, Jon Chase wrote: Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) at scala.collection.Iterator
Column not found in schema when querying partitioned table
Spark 1.3.0, Parquet I'm having trouble referencing partition columns in my queries. In the following example, 'probeTypeId' is a partition column. For example, the directory structure looks like this: /mydata /probeTypeId=1 ...files... /probeTypeId=2 ...files... I see the column when I reference load a DF using the /mydata directory and call df.printSchema(): ... |-- probeTypeId: integer (nullable = true) ... Parquet is also aware of the column: optional int32 probeTypeId; And this works fine: sqlContext.sql(select probeTypeId from df limit 1); ...as does df.show() - it shows the correct values for the partition column. However, when I try to use a partition column in a where clause, I get an exception stating that the column was not found in the schema: sqlContext.sql(select probeTypeId from df where probeTypeId = 1 limit 1); ... ... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: Column [probeTypeId] was not found in schema! at parquet.Preconditions.checkArgument(Preconditions.java:47) at parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41) at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162) at parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22) at parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108) at parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158) ... ... What am I doing wrong? Here's the full stack trace: using local[*] for master 06:05:55,675 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set 06:05:55,683 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender] 06:05:55,694 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT] 06:05:55,721 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property 06:05:55,768 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO 06:05:55,768 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT] 06:05:55,769 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration. 06:05:55,770 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6aaceffd - Registering current configuration as safe fallback point INFO org.apache.spark.SparkContext Running Spark version 1.3.0 WARN o.a.hadoop.util.NativeCodeLoader Unable to load native-hadoop library for your platform... using builtin-java classes where applicable INFO org.apache.spark.SecurityManager Changing view acls to: jon INFO org.apache.spark.SecurityManager Changing modify acls to: jon INFO org.apache.spark.SecurityManager SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jon); users with modify permissions: Set(jon) INFO akka.event.slf4j.Slf4jLogger Slf4jLogger started INFO Remoting Starting remoting INFO Remoting Remoting started; listening on addresses :[akka.tcp:// sparkDriver@192.168.1.134:62493] INFO org.apache.spark.util.Utils Successfully started service 'sparkDriver' on port 62493. INFO org.apache.spark.SparkEnv Registering MapOutputTracker INFO org.apache.spark.SparkEnv Registering BlockManagerMaster INFO o.a.spark.storage.DiskBlockManager Created local directory at /var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-150e23b2-ff19-4a51-8cfc-25fb8e1b3f2b/blockmgr-6eea286c-7473-4bda-8886-7250156b68f4 INFO org.apache.spark.storage.MemoryStore MemoryStore started with capacity 1966.1 MB INFO org.apache.spark.HttpFileServer HTTP File server directory is /var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-cf4687bd-1563-4ddf-b697-21c96fd95561/httpd-6343b9c9-bb66-43ac-ac43-6da80c7a1f95 INFO org.apache.spark.HttpServer Starting HTTP Server INFO o.spark-project.jetty.server.Server
Re: Column not found in schema when querying partitioned table
I've filed this as https://issues.apache.org/jira/browse/SPARK-6554 On Thu, Mar 26, 2015 at 6:29 AM, Jon Chase jon.ch...@gmail.com wrote: Spark 1.3.0, Parquet I'm having trouble referencing partition columns in my queries. In the following example, 'probeTypeId' is a partition column. For example, the directory structure looks like this: /mydata /probeTypeId=1 ...files... /probeTypeId=2 ...files... I see the column when I reference load a DF using the /mydata directory and call df.printSchema(): ... |-- probeTypeId: integer (nullable = true) ... Parquet is also aware of the column: optional int32 probeTypeId; And this works fine: sqlContext.sql(select probeTypeId from df limit 1); ...as does df.show() - it shows the correct values for the partition column. However, when I try to use a partition column in a where clause, I get an exception stating that the column was not found in the schema: sqlContext.sql(select probeTypeId from df where probeTypeId = 1 limit 1); ... ... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: Column [probeTypeId] was not found in schema! at parquet.Preconditions.checkArgument(Preconditions.java:47) at parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41) at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162) at parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22) at parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108) at parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158) ... ... What am I doing wrong? Here's the full stack trace: using local[*] for master 06:05:55,675 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set 06:05:55,683 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender] 06:05:55,694 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT] 06:05:55,721 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property 06:05:55,768 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO 06:05:55,768 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT] 06:05:55,769 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration. 06:05:55,770 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6aaceffd - Registering current configuration as safe fallback point INFO org.apache.spark.SparkContext Running Spark version 1.3.0 WARN o.a.hadoop.util.NativeCodeLoader Unable to load native-hadoop library for your platform... using builtin-java classes where applicable INFO org.apache.spark.SecurityManager Changing view acls to: jon INFO org.apache.spark.SecurityManager Changing modify acls to: jon INFO org.apache.spark.SecurityManager SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jon); users with modify permissions: Set(jon) INFO akka.event.slf4j.Slf4jLogger Slf4jLogger started INFO Remoting Starting remoting INFO Remoting Remoting started; listening on addresses :[akka.tcp:// sparkDriver@192.168.1.134:62493] INFO org.apache.spark.util.Utils Successfully started service 'sparkDriver' on port 62493. INFO org.apache.spark.SparkEnv Registering MapOutputTracker INFO org.apache.spark.SparkEnv Registering BlockManagerMaster INFO o.a.spark.storage.DiskBlockManager Created local directory at /var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-150e23b2-ff19-4a51-8cfc-25fb8e1b3f2b/blockmgr-6eea286c-7473-4bda-8886-7250156b68f4 INFO org.apache.spark.storage.MemoryStore MemoryStore started with capacity 1966.1 MB INFO org.apache.spark.HttpFileServer HTTP File server directory is /var
Spark SQL queries hang forever
Spark 1.3.0 on YARN (Amazon EMR), cluster of 10 m3.2xlarge (8cpu, 30GB), executor memory 20GB, driver memory 10GB I'm using Spark SQL, mainly via spark-shell, to query 15GB of data spread out over roughly 2,000 Parquet files and my queries frequently hang. Simple queries like select count(*) from ... on the entire data set work ok. Slightly more demanding ones with group by's and some aggregate functions (percentile_approx, avg, etc.) work ok as well, as long as I have some criteria in my where clause to keep the number of rows down. Once I hit some limit on query complexity and rows processed, my queries start to hang. I've left them for up to an hour without seeing any progress. No OOM's either - the job is just stuck. I've tried setting spark.sql.shuffle.partitions to 400 and even 800, but with the same results: usually near the end of the tasks (like 780 of 800 complete), progress just stops: 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 788.0 in stage 1.0 (TID 1618) in 800 ms on ip-10-209-22-211.eu-west-1.compute.internal (748/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 793.0 in stage 1.0 (TID 1623) in 622 ms on ip-10-105-12-41.eu-west-1.compute.internal (749/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 797.0 in stage 1.0 (TID 1627) in 616 ms on ip-10-90-2-201.eu-west-1.compute.internal (750/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 799.0 in stage 1.0 (TID 1629) in 611 ms on ip-10-90-2-201.eu-west-1.compute.internal (751/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 795.0 in stage 1.0 (TID 1625) in 669 ms on ip-10-105-12-41.eu-west-1.compute.internal (752/800) ^^^ this is where it stays forever Looking at the Spark UI, several of the executors still list active tasks. I do see that the Shuffle Read for executors that don't have any tasks remaining is around 100MB, whereas it's more like 10MB for the executors that still have tasks. The first stage, mapPartitions, always completes fine. It's the second stage (takeOrdered), that hangs. I've had this issue in 1.2.0 and 1.2.1 as well as 1.3.0. I've also encountered it when using JSON files (instead of Parquet). Thoughts? I'm blocked on using Spark SQL b/c most of the queries I do are having this issue.
Re: Registering custom UDAFs with HiveConetxt in SparkSQL, how?
Shahab - This should do the trick until Hao's changes are out: sqlContext.sql(create temporary function foobar as 'com.myco.FoobarUDAF'); sqlContext.sql(select foobar(some_column) from some_table); This works without requiring to 'deploy' a JAR with the UDAF in it - just make sure the UDAF is in your project's classpath. On Tue, Mar 10, 2015 at 8:21 PM, Cheng, Hao hao.ch...@intel.com wrote: Oh, sorry, my bad, currently Spark SQL doesn't provide the user interface for UDAF, but it can work seamlessly with Hive UDAF (via HiveContext). I am also working on the UDAF interface refactoring, after that we can provide the custom interface for extension. https://github.com/apache/spark/pull/3247 *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Wednesday, March 11, 2015 1:44 AM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Registering custom UDAFs with HiveConetxt in SparkSQL, how? Thanks Hao, But my question concerns UDAF (user defined aggregation function ) not UDTF( user defined type function ). I appreciate if you could point me to some starting point on UDAF development in Spark. Thanks Shahab On Tuesday, March 10, 2015, Cheng, Hao hao.ch...@intel.com wrote: Currently, Spark SQL doesn't provide interface for developing the custom UDTF, but it can work seamless with Hive UDTF. I am working on the UDTF refactoring for Spark SQL, hopefully will provide an Hive independent UDTF soon after that. *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 10, 2015 5:44 PM *To:* user@spark.apache.org *Subject:* Registering custom UDAFs with HiveConetxt in SparkSQL, how? Hi, I need o develop couple of UDAFs and use them in the SparkSQL. While UDFs can be registered as a function in HiveContext, I could not find any documentation of how UDAFs can be registered in the HiveContext?? so far what I have found is to make a JAR file, out of developed UDAF class, and then deploy the JAR file to SparkSQL . But is there any way to avoid deploying the jar file and register it programmatically? best, /Shahab
Re: ReceiverInputDStream#saveAsTextFiles with a S3 URL results in double forward slash key names in S3
I've had a lot of difficulties with using the s3:// prefix. s3n:// seems to work much better. Can't find the link ATM, but seems I recall that s3:// (Hadoop's original block format for s3) is no longer recommended for use. Amazon's EMR goes so far as to remap the s3:// to s3n:// behind the scenes. On Tue, Dec 23, 2014 at 9:29 AM, Enno Shioji eshi...@gmail.com wrote: ᐧ I filed a new issue HADOOP-11444. According to HADOOP-10372, s3 is likely to be deprecated anyway in favor of s3n. Also the comment section notes that Amazon has implemented an EmrFileSystem for S3 which is built using AWS SDK rather than JetS3t. On Tue, Dec 23, 2014 at 2:06 PM, Enno Shioji eshi...@gmail.com wrote: Hey Jay :) I tried s3n which uses the Jets3tNativeFileSystemStore, and the double slash went away. As far as I can see, it does look like a bug in hadoop-common; I'll file a ticket for it. Hope you are doing well, by the way! PS: Jets3tNativeFileSystemStore's implementation of pathToKey is: == private static String pathToKey(Path path) { if (path.toUri().getScheme() != null path.toUri().getPath().isEmpty()) { // allow uris without trailing slash after bucket to refer to root, // like s3n://mybucket return ; } if (!path.isAbsolute()) { throw new IllegalArgumentException(Path must be absolute: + path); } String ret = path.toUri().getPath().substring(1); // remove initial slash if (ret.endsWith(/) (ret.indexOf(/) != ret.length() - 1)) { ret = ret.substring(0, ret.length() -1); } return ret; } == whereas Jets3tFileSystemStore uses: == private String pathToKey(Path path) { if (!path.isAbsolute()) { throw new IllegalArgumentException(Path must be absolute: + path); } return path.toUri().getPath(); } == On Tue, Dec 23, 2014 at 1:07 PM, Jay Vyas jayunit100.apa...@gmail.com wrote: Hi enno. Might be worthwhile to cross post this on dev@hadoop... Obviously a simple spark way to test this would be to change the uri to write to hdfs:// or maybe you could do file:// , and confirm that the extra slash goes away. - if it's indeed a jets3t issue we should add a new unit test for this if the hcfs tests are passing for jets3tfilesystem, yet this error still exists. - To learn how to run HCFS tests against any FileSystem , see the wiki page : https://wiki.apache.org/hadoop/HCFS/Progress (see the July 14th entry on that page). - Is there another S3FileSystem implementation for AbstractFileSystem or is jets3t the only one? That would be a easy way to test this. And also a good workaround. I'm wondering, also why jets3tfilesystem is the AbstractFileSystem used by so many - is that the standard impl for storing using AbstractFileSystem interface? On Dec 23, 2014, at 6:06 AM, Enno Shioji eshi...@gmail.com wrote: Is anybody experiencing this? It looks like a bug in JetS3t to me, but thought I'd sanity check before filing an issue. I'm writing to S3 using ReceiverInputDStream#saveAsTextFiles with a S3 URL (s3://fake-test/1234). The code does write to S3, but with double forward slashes (e.g. s3://fake-test//1234/-141933428/. I did a debug and it seem like the culprit is Jets3tFileSystemStore#pathToKey(path), which returns /fake-test/1234/... for the input s3://fake-test/1234/ when it should hack off the first forward slash. However, I couldn't find any bug report for JetS3t for this. Am I missing something, or is this likely a JetS3t bug?
Re: JavaRDD (Data Aggregation) based on key
Have a look at RDD.groupBy(...) and reduceByKey(...) On Tue, Dec 23, 2014 at 4:47 AM, sachin Singh sachin.sha...@gmail.com wrote: Hi, I have a csv file having fields as a,b,c . I want to do aggregation(sum,average..) based on any field(a,b or c) as per user input, using Apache Spark Java API,Please Help Urgent! Thanks in advance, Regards Sachin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaRDD-Data-Aggregation-based-on-key-tp20828.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: S3 files , Spark job hungsup
http://www.jets3t.org/toolkit/configuration.html Put the following properties in a file named jets3t.properties and make sure it is available during the running of your Spark job (just place it in ~/ and pass a reference to it when calling spark-submit with --file ~/jets3t.properties) httpclient.connection-timeout-ms How many milliseconds to wait before a connection times out. 0 means infinity. Default: 6 httpclient.socket-timeout-ms How many milliseconds to wait before a socket connection times out. 0 means infinity. Default: 6 You will also probably want to increase this value substantially: httpclient.max-connections The maximum number of simultaneous connections to allow globally Default: 20 Note: If you have a fast Internet connection, you can improve the performance of your S3 client by increasing this setting and the corresponding S3 Service properties s3service.max-thread-count and s3service.admin-max-thread-count. However, be careful because if you increase this value too much for your connection you may exceed your available bandwidth and cause communications errors. On Mon, Dec 22, 2014 at 1:20 PM, durga katakam durgak...@gmail.com wrote: Yes . I am reading thousands of files every hours. Is there any way I can tell spark to timeout. Thanks for your help. -D On Mon, Dec 22, 2014 at 4:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Is it possible too many connections open to read from s3 from one node? I have this issue before because I open a few hundreds of files on s3 to read from one node. It just block itself without error until timeout later. On Monday, December 22, 2014, durga durgak...@gmail.com wrote: Hi All, I am facing a strange issue sporadically. occasionally my spark job is hungup on reading s3 files. It is not throwing exception . or making some progress, it is just hungs up there. Is this a known issue , Please let me know how could I solve this issue. Thanks, -D -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3-files-Spark-job-hungsup-tp20806.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Downloads from S3 exceedingly slow when running on spark-ec2
Turns out I was using the s3:// prefix (in a standalone Spark cluster). It was writing a LOT of block_* files to my S3 bucket, which was the cause for the slowness. I was coming from Amazon EMR, where Amazon's underlying FS implementation has re-mapped s3:// to s3n://, which doesn't use the block_* files. On Sat, Dec 20, 2014 at 8:17 PM, Paul Brown p...@mult.ifario.us wrote: I would suggest checking out disk IO on the nodes in your cluster and then reading up on the limiting behaviors that accompany different kinds of EC2 storage. Depending on how things are configured for your nodes, you may have a local storage configuration that provides bursty IOPS where you get apparently good performance at first and then limiting kicks in and slows down the rate at which you can write data to local storage. -- p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Dec 18, 2014 at 5:56 AM, Jon Chase jon.ch...@gmail.com wrote: I'm running a very simple Spark application that downloads files from S3, does a bit of mapping, then uploads new files. Each file is roughly 2MB and is gzip'd. I was running the same code on Amazon's EMR w/Spark and not having any download speed issues (Amazon's EMR provides a custom implementation of the s3n:// file system, FWIW). When I say exceedingly slow, I mean that it takes about 2 minutes to download and process a 2MB file (this was taking ~2 seconds on the same instance types in Amazon's EMR). When I download the same file from the EC2 machine with wget or curl, it downloads in ~ 1 second. I've also done other bandwidth checks for downloads from other external hosts - no speed problems there. Tried this w/Spark 1.1.0 and 1.1.1. When I do a thread dump on a worker, I typically see this a lot: Executor task launch worker-7 daemon prio=10 tid=0x7fd174039000 nid=0x59e9 runnable [0x7fd1f7dfb000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) at sun.security.ssl.InputRecord.read(InputRecord.java:480) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) - locked 0x0007e44dd140 (a java.lang.Object) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) - locked 0x0007e44e1350 (a sun.security.ssl.AppInputStream) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read(BufferedInputStream.java:254) - locked 0x0007e44ea800 (a java.io.BufferedInputStream) at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78) at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106) at org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413) at org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973) at org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735) at org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus
Re: Fetch Failure
I'm getting the same error (ExecutorLostFailure) - input RDD is 100k small files (~2MB each). I do a simple map, then keyBy(), and then rdd.saveAsHadoopDataset(...). Depending on the memory settings given to spark-submit, the time before the first ExecutorLostFailure varies (more memory == longer until failure) - but this usually happens after about 100 files being processed. I'm running Spark 1.1.0 on AWS EMR w/Yarn.It appears that Yarn is killing the executor b/c it thinks it's exceeding memory. However, I can't repro any OOM issues when running locally, no matter the size of the data set. It seems like Yarn thinks the heap size is increasing according to the Yarn logs: 2014-12-18 22:06:43,505 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.1 GB of 6.5 GB physical memory used; 13.8 GB of 32.5 GB virtual memory used 2014-12-18 22:06:46,516 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory used; 13.9 GB of 32.5 GB virtual memory used 2014-12-18 22:06:49,524 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory used; 14.0 GB of 32.5 GB virtual memory used 2014-12-18 22:06:52,531 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.4 GB of 6.5 GB physical memory used; 14.1 GB of 32.5 GB virtual memory used 2014-12-18 22:06:55,538 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory used; 14.2 GB of 32.5 GB virtual memory used 2014-12-18 22:06:58,549 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory used; 14.3 GB of 32.5 GB virtual memory used 2014-12-18 22:06:58,549 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Process tree for container: container_1418928607193_0011_01_02 has processes older than 1 iteration running over the configured limit. Limit=6979321856, current usage = 6995812352 2014-12-18 22:06:58,549 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Container [pid=24273,containerID=container_1418928607193_0011_01_02] is running beyond physical memory limits. Current usage: 6.5 GB of 6.5 GB physical memory used; 14.3 GB of 32.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1418928607193_0011_01_02 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 24273 4304 24273 24273 (bash) 0 0 115630080 302 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms6144m -Xmx6144m -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -Djava.io.tmpdir=/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1418928607193_0011/container_1418928607193_0011_01_02/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkdri...@ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal:54357/user/CoarseGrainedScheduler 1 ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal 4 1 /mnt/var/log/hadoop/userlogs/application_1418928607193_0011/container_1418928607193_0011_01_02/stdout 2 /mnt/var/log/hadoop/userlogs/application_1418928607193_0011/container_1418928607193_0011_01_02/stderr |- 24277 24273 24273 24273 (java) 13808 1730 15204556800 1707660 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms6144m -Xmx6144m -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -Djava.io.tmpdir=/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1418928607193_0011/container_1418928607193_0011_01_02/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkdri...@ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal:54357/user/CoarseGrainedScheduler 1
Re: Fetch Failure
I'm actually already running 1.1.1. I also just tried --conf spark.yarn.executor.memoryOverhead=4096, but no luck. Still getting ExecutorLostFailure (executor lost). On Fri, Dec 19, 2014 at 10:43 AM, Rafal Kwasny rafal.kwa...@gmail.com wrote: Hi, Just upgrade to 1.1.1 - it was fixed some time ago /Raf sandy.r...@cloudera.com wrote: Hi Jon, The fix for this is to increase spark.yarn.executor.memoryOverhead to something greater than it's default of 384. This will increase the gap between the executors heap size and what it requests from yarn. It's required because jvms take up some memory beyond their heap size. -Sandy On Dec 19, 2014, at 9:04 AM, Jon Chase jon.ch...@gmail.com wrote: I'm getting the same error (ExecutorLostFailure) - input RDD is 100k small files (~2MB each). I do a simple map, then keyBy(), and then rdd.saveAsHadoopDataset(...). Depending on the memory settings given to spark-submit, the time before the first ExecutorLostFailure varies (more memory == longer until failure) - but this usually happens after about 100 files being processed. I'm running Spark 1.1.0 on AWS EMR w/Yarn.It appears that Yarn is killing the executor b/c it thinks it's exceeding memory. However, I can't repro any OOM issues when running locally, no matter the size of the data set. It seems like Yarn thinks the heap size is increasing according to the Yarn logs: 2014-12-18 22:06:43,505 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.1 GB of 6.5 GB physical memory used; 13.8 GB of 32.5 GB virtual memory used 2014-12-18 22:06:46,516 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory used; 13.9 GB of 32.5 GB virtual memory used 2014-12-18 22:06:49,524 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory used; 14.0 GB of 32.5 GB virtual memory used 2014-12-18 22:06:52,531 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.4 GB of 6.5 GB physical memory used; 14.1 GB of 32.5 GB virtual memory used 2014-12-18 22:06:55,538 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory used; 14.2 GB of 32.5 GB virtual memory used 2014-12-18 22:06:58,549 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory used; 14.3 GB of 32.5 GB virtual memory used 2014-12-18 22:06:58,549 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Process tree for container: container_1418928607193_0011_01_02 has processes older than 1 iteration running over the configured limit. Limit=6979321856, current usage = 6995812352 2014-12-18 22:06:58,549 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Container [pid=24273,containerID=container_1418928607193_0011_01_02] is running beyond physical memory limits. Current usage: 6.5 GB of 6.5 GB physical memory used; 14.3 GB of 32.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1418928607193_0011_01_02 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 24273 4304 24273 24273 (bash) 0 0 115630080 302 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms6144m -Xmx6144m -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -Djava.io.tmpdir=/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1418928607193_0011/container_1418928607193_0011_01_02/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkdri...@ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal:54357/user/CoarseGrainedScheduler 1 ip-xx-xxx-xxx-xxx.eu-west-1.compute.internal 4 1 /mnt/var/log/hadoop/userlogs/application_1418928607193_0011
Re: Fetch Failure
Hmmm, I see this a lot (multiple times per second) in the stdout logs of my application: 2014-12-19T16:12:35.748+: [GC (Allocation Failure) [ParNew: 286663K-12530K(306688K), 0.0074579 secs] 1470813K-1198034K(2063104K), 0.0075189 secs] [Times: user=0.03 sys=0.00, real=0.01 secs] And finally I see 2014-12-19 16:12:36,116 ERROR [SIGTERM handler] executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) - RECEIVED SIGNAL 15: SIGTERM which I assume is coming from Yarn, after which the log contains this and then ends: Heap par new generation total 306688K, used 23468K [0x8000, 0x94cc, 0x94cc) eden space 272640K, 4% used [0x8000, 0x80abff10, 0x90a4) from space 34048K, 36% used [0x92b8, 0x937ab488, 0x94cc) to space 34048K, 0% used [0x90a4, 0x90a4, 0x92b8) concurrent mark-sweep generation total 1756416K, used 1186756K [0x94cc, 0x0001, 0x0001) Metaspace used 52016K, capacity 52683K, committed 52848K, reserved 1095680K class spaceused 7149K, capacity 7311K, committed 7392K, reserved 1048576K On Fri, Dec 19, 2014 at 11:16 AM, Jon Chase jon.ch...@gmail.com wrote: I'm actually already running 1.1.1. I also just tried --conf spark.yarn.executor.memoryOverhead=4096, but no luck. Still getting ExecutorLostFailure (executor lost). On Fri, Dec 19, 2014 at 10:43 AM, Rafal Kwasny rafal.kwa...@gmail.com wrote: Hi, Just upgrade to 1.1.1 - it was fixed some time ago /Raf sandy.r...@cloudera.com wrote: Hi Jon, The fix for this is to increase spark.yarn.executor.memoryOverhead to something greater than it's default of 384. This will increase the gap between the executors heap size and what it requests from yarn. It's required because jvms take up some memory beyond their heap size. -Sandy On Dec 19, 2014, at 9:04 AM, Jon Chase jon.ch...@gmail.com wrote: I'm getting the same error (ExecutorLostFailure) - input RDD is 100k small files (~2MB each). I do a simple map, then keyBy(), and then rdd.saveAsHadoopDataset(...). Depending on the memory settings given to spark-submit, the time before the first ExecutorLostFailure varies (more memory == longer until failure) - but this usually happens after about 100 files being processed. I'm running Spark 1.1.0 on AWS EMR w/Yarn.It appears that Yarn is killing the executor b/c it thinks it's exceeding memory. However, I can't repro any OOM issues when running locally, no matter the size of the data set. It seems like Yarn thinks the heap size is increasing according to the Yarn logs: 2014-12-18 22:06:43,505 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.1 GB of 6.5 GB physical memory used; 13.8 GB of 32.5 GB virtual memory used 2014-12-18 22:06:46,516 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory used; 13.9 GB of 32.5 GB virtual memory used 2014-12-18 22:06:49,524 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory used; 14.0 GB of 32.5 GB virtual memory used 2014-12-18 22:06:52,531 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.4 GB of 6.5 GB physical memory used; 14.1 GB of 32.5 GB virtual memory used 2014-12-18 22:06:55,538 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory used; 14.2 GB of 32.5 GB virtual memory used 2014-12-18 22:06:58,549 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory used; 14.3 GB of 32.5 GB virtual memory used 2014-12-18 22:06:58,549 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Process tree for container: container_1418928607193_0011_01_02 has processes older than 1 iteration running over the configured limit. Limit=6979321856, current usage = 6995812352 2014-12-18 22:06:58,549 WARN
Re: Fetch Failure
Yes, same problem. On Fri, Dec 19, 2014 at 11:29 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Do you hit the same errors? Is it now saying your containers are exceed ~10 GB? On Fri, Dec 19, 2014 at 11:16 AM, Jon Chase jon.ch...@gmail.com wrote: I'm actually already running 1.1.1. I also just tried --conf spark.yarn.executor.memoryOverhead=4096, but no luck. Still getting ExecutorLostFailure (executor lost). On Fri, Dec 19, 2014 at 10:43 AM, Rafal Kwasny rafal.kwa...@gmail.com wrote: Hi, Just upgrade to 1.1.1 - it was fixed some time ago /Raf sandy.r...@cloudera.com wrote: Hi Jon, The fix for this is to increase spark.yarn.executor.memoryOverhead to something greater than it's default of 384. This will increase the gap between the executors heap size and what it requests from yarn. It's required because jvms take up some memory beyond their heap size. -Sandy On Dec 19, 2014, at 9:04 AM, Jon Chase jon.ch...@gmail.com wrote: I'm getting the same error (ExecutorLostFailure) - input RDD is 100k small files (~2MB each). I do a simple map, then keyBy(), and then rdd.saveAsHadoopDataset(...). Depending on the memory settings given to spark-submit, the time before the first ExecutorLostFailure varies (more memory == longer until failure) - but this usually happens after about 100 files being processed. I'm running Spark 1.1.0 on AWS EMR w/Yarn.It appears that Yarn is killing the executor b/c it thinks it's exceeding memory. However, I can't repro any OOM issues when running locally, no matter the size of the data set. It seems like Yarn thinks the heap size is increasing according to the Yarn logs: 2014-12-18 22:06:43,505 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.1 GB of 6.5 GB physical memory used; 13.8 GB of 32.5 GB virtual memory used 2014-12-18 22:06:46,516 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory used; 13.9 GB of 32.5 GB virtual memory used 2014-12-18 22:06:49,524 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.2 GB of 6.5 GB physical memory used; 14.0 GB of 32.5 GB virtual memory used 2014-12-18 22:06:52,531 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.4 GB of 6.5 GB physical memory used; 14.1 GB of 32.5 GB virtual memory used 2014-12-18 22:06:55,538 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory used; 14.2 GB of 32.5 GB virtual memory used 2014-12-18 22:06:58,549 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 24273 for container-id container_1418928607193_0011_01_02: 6.5 GB of 6.5 GB physical memory used; 14.3 GB of 32.5 GB virtual memory used 2014-12-18 22:06:58,549 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Process tree for container: container_1418928607193_0011_01_02 has processes older than 1 iteration running over the configured limit. Limit=6979321856, current usage = 6995812352 2014-12-18 22:06:58,549 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Container [pid=24273,containerID=container_1418928607193_0011_01_02] is running beyond physical memory limits. Current usage: 6.5 GB of 6.5 GB physical memory used; 14.3 GB of 32.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1418928607193_0011_01_02 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 24273 4304 24273 24273 (bash) 0 0 115630080 302 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms6144m -Xmx6144m -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -Djava.io.tmpdir=/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1418928607193_0011/container_1418928607193_0011_01_02/tmp
Yarn not running as many executors as I'd like
Running on Amazon EMR w/Yarn and Spark 1.1.1, I have trouble getting Yarn to use the number of executors that I specify in spark-submit: --num-executors 2 In a cluster with two core nodes will typically only result in one executor running at a time. I can play with the memory settings and num-cores-per-executor, and sometimes I can get 2 executors running at once, but I'm not sure what the secret formula is to make this happen consistently.
Downloads from S3 exceedingly slow when running on spark-ec2
I'm running a very simple Spark application that downloads files from S3, does a bit of mapping, then uploads new files. Each file is roughly 2MB and is gzip'd. I was running the same code on Amazon's EMR w/Spark and not having any download speed issues (Amazon's EMR provides a custom implementation of the s3n:// file system, FWIW). When I say exceedingly slow, I mean that it takes about 2 minutes to download and process a 2MB file (this was taking ~2 seconds on the same instance types in Amazon's EMR). When I download the same file from the EC2 machine with wget or curl, it downloads in ~ 1 second. I've also done other bandwidth checks for downloads from other external hosts - no speed problems there. Tried this w/Spark 1.1.0 and 1.1.1. When I do a thread dump on a worker, I typically see this a lot: Executor task launch worker-7 daemon prio=10 tid=0x7fd174039000 nid=0x59e9 runnable [0x7fd1f7dfb000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) at sun.security.ssl.InputRecord.read(InputRecord.java:480) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) - locked 0x0007e44dd140 (a java.lang.Object) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) - locked 0x0007e44e1350 (a sun.security.ssl.AppInputStream) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read(BufferedInputStream.java:254) - locked 0x0007e44ea800 (a java.io.BufferedInputStream) at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78) at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106) at org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413) at org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973) at org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735) at org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126) at org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256) at org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126) at org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44) at org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99) at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) at
Spark cluster with Java 8 using ./spark-ec2
I'm trying to use the spark-ec2 command to launch a Spark cluster that runs Java 8, but so far I haven't been able to get the Spark processes to use the right JVM at start up. Here's the command I use for launching the cluster. Note I'm using the user-data feature to install Java 8: ./spark-ec2 -k spark -i ~/.ssh/spark.pem \ -t m3.large -s 1 \ --user-data=java8.sh launch spark After the cluster is running, I can SSH in and see that the default Java version is indeed 8: ssh root@... $ echo $JAVA_HOME /usr/java/default $ java -version java version 1.8.0 Java(TM) SE Runtime Environment (build 1.8.0-b132) Java HotSpot(TM) 64-Bit Server VM (build 25.0-b70, mixed mode) It seems that the Spark processes are still using Java 7. I've tried running sbin/stop-all.sh and start-all.sh from master, but that doesn't seem to help. What magic incantation am I missing? java8.sh user data script: #!/bin/bash # Check java version JAVA_VER=$(java -version 21 | sed 's/java version \(.*\)\.\(.*\)\..*/\1\2/; 1q') if [ $JAVA_VER -lt 18 ] then # Download jdk 8 echo Downloading and installing jdk 8 wget --no-cookies --no-check-certificate --header Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie http://download.oracle.com/otn-pub/java/jdk/8-b132/jdk-8-linux-x64.rpm; # Silent install yum -y install jdk-8-linux-x64.rpm # Figure out how many versions of Java we currently have NR_OF_OPTIONS=$(echo 0 | alternatives --config java 2/dev/null | grep 'There ' | awk '{print $3}' | tail -1) echo Found $NR_OF_OPTIONS existing versions of java. Adding new version. # Make the new java version available via /etc/alternatives alternatives --install /usr/bin/java java /usr/java/default/bin/java 1 # Make java 8 the default echo $(($NR_OF_OPTIONS + 1)) | alternatives --config java # Set some variables export JAVA_HOME=/usr/java/default/bin/java export JRE_HOME=/usr/java/default/jre export PATH=$PATH:/usr/java/default/bin fi # Check java version again JAVA_VER=$(java -version 21 | sed 's/java version \(.*\)\.\(.*\)\..*/\1\2/; 1q') echo export JAVA_HOME=/usr/java/default /root/.bash_profile . ~/.bash_profile echo Java version is $JAVA_VER echo JAVA_HOME: $JAVA_HOME echo JRE_HOME: $JRE_HOME echo PATH: $PATH Here's the stacktrace from stdout from the spark-submit command: 14/11/25 14:01:11 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 1.0 (TID 7) on executor ip-xx-xx-xxx-xx.eu-west-1.compute.internal: java.lang.UnsupportedClassVersionError (foo/spark/Main : Unsupported major.minor version 52.0) [duplicate 3] 14/11/25 14:01:11 ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job 14/11/25 14:01:11 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 14/11/25 14:01:11 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled 14/11/25 14:01:11 INFO scheduler.DAGScheduler: Failed to run saveAsHadoopFile at Main.java:146 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-xx-xx-xxx-xx.eu-west-1.compute.internal): java.lang.UnsupportedClassVersionError: foo/spark/Main : Unsupported major.minor version 52.0 java.lang.ClassLoader.defineClass1(Native Method) java.lang.ClassLoader.defineClass(ClassLoader.java:800) java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) java.net.URLClassLoader.defineClass(URLClassLoader.java:449) java.net.URLClassLoader.access$100(URLClassLoader.java:71) java.net.URLClassLoader$1.run(URLClassLoader.java:361) java.net.URLClassLoader$1.run(URLClassLoader.java:355) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:354) java.lang.ClassLoader.loadClass(ClassLoader.java:425) java.lang.ClassLoader.loadClass(ClassLoader.java:358) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:274) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)