GC problem doing fuzzy join
I'm trying to do a brute force fuzzy join where I compare N records against N other records, for N^2 total comparisons. The table is medium size and fits in memory, so I collect it and put it into a broadcast variable. The other copy of the table is in an RDD. I am basically calling the RDD map operation, and each record in the RDD takes the broadcasted table and FILTERS it. There appears to be large GC happening, so I suspect that huge repeated data deletion of copies of the broadcast table is causing GC. Is there a way to fix this pattern? Thanks, Arun
Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS
Also for the record, turning on kryo was not able to help. On Tue, Aug 23, 2016 at 12:58 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > Splitting up the Maps to separate objects did not help. > > However, I was able to work around the problem by reimplementing it with > RDD joins. > > On Aug 18, 2016 5:16 PM, "Arun Luthra" <arun.lut...@gmail.com> wrote: > >> This might be caused by a few large Map objects that Spark is trying to >> serialize. These are not broadcast variables or anything, they're just >> regular objects. >> >> Would it help if I further indexed these maps into a two-level Map i.e. >> Map[String, Map[String, Int]] ? Or would this still count against me? >> >> What if I manually split them up into numerous Map variables? >> >> On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra <arun.lut...@gmail.com> >> wrote: >> >>> I got this OOM error in Spark local mode. The error seems to have been >>> at the start of a stage (all of the stages on the UI showed as complete, >>> there were more stages to do but had not showed up on the UI yet). >>> >>> There appears to be ~100G of free memory at the time of the error. >>> >>> Spark 2.0.0 >>> 200G driver memory >>> local[30] >>> 8 /mntX/tmp directories for spark.local.dir >>> "spark.sql.shuffle.partitions", "500" >>> "spark.driver.maxResultSize","500" >>> "spark.default.parallelism", "1000" >>> >>> The line number for the error is at an RDD map operation where there are >>> some potentially large Map objects that are going to be accessed by each >>> record. Does it matter if they are broadcast variables or not? I imagine >>> not because its in local mode they should be available in memory to every >>> executor/core. >>> >>> Possibly related: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cl >>> osureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html >>> >>> Exception in thread "main" java.lang.OutOfMemoryError >>> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputSt >>> ream.java:123) >>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) >>> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutput >>> Stream.java:93) >>> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) >>> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Objec >>> tOutputStream.java:1877) >>> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDat >>> aMode(ObjectOutputStream.java:1786) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) >>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at org.apache.spark.serializer.JavaSerializationStream.writeObj >>> ect(JavaSerializer.scala:43) >>> at org.apache.spark.serializer.JavaSerializerInstance.serialize >>> (JavaSerializer.scala:100) >>> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(Clo >>> sureCleaner.scala:295) >>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ >>> ClosureCleaner$$clean(ClosureCleaner.scala:288) >>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) >>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) >>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) >>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) >>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati >>> onScope.scala:151) >>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati >>> onScope.scala:112) >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) >>> at org.apache.spark.rdd.RDD.map(RDD.scala:365) >>> at abc.Abc$.main(abc.scala:395) >>> at abc.Abc.main(abc.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>> ssorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>> thodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy >>> $SparkSubmit$$runMain(SparkSubmit.scala:729) >>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit >>> .scala:185) >>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) >>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>> >>> >>
Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS
Splitting up the Maps to separate objects did not help. However, I was able to work around the problem by reimplementing it with RDD joins. On Aug 18, 2016 5:16 PM, "Arun Luthra" <arun.lut...@gmail.com> wrote: > This might be caused by a few large Map objects that Spark is trying to > serialize. These are not broadcast variables or anything, they're just > regular objects. > > Would it help if I further indexed these maps into a two-level Map i.e. > Map[String, Map[String, Int]] ? Or would this still count against me? > > What if I manually split them up into numerous Map variables? > > On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra <arun.lut...@gmail.com> > wrote: > >> I got this OOM error in Spark local mode. The error seems to have been at >> the start of a stage (all of the stages on the UI showed as complete, there >> were more stages to do but had not showed up on the UI yet). >> >> There appears to be ~100G of free memory at the time of the error. >> >> Spark 2.0.0 >> 200G driver memory >> local[30] >> 8 /mntX/tmp directories for spark.local.dir >> "spark.sql.shuffle.partitions", "500" >> "spark.driver.maxResultSize","500" >> "spark.default.parallelism", "1000" >> >> The line number for the error is at an RDD map operation where there are >> some potentially large Map objects that are going to be accessed by each >> record. Does it matter if they are broadcast variables or not? I imagine >> not because its in local mode they should be available in memory to every >> executor/core. >> >> Possibly related: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cl >> osureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html >> >> Exception in thread "main" java.lang.OutOfMemoryError >> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputSt >> ream.java:123) >> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) >> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutput >> Stream.java:93) >> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) >> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Objec >> tOutputStream.java:1877) >> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDat >> aMode(ObjectOutputStream.java:1786) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at org.apache.spark.serializer.JavaSerializationStream.writeObj >> ect(JavaSerializer.scala:43) >> at org.apache.spark.serializer.JavaSerializerInstance.serialize >> (JavaSerializer.scala:100) >> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(Clo >> sureCleaner.scala:295) >> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ >> ClosureCleaner$$clean(ClosureCleaner.scala:288) >> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) >> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) >> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) >> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati >> onScope.scala:151) >> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati >> onScope.scala:112) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) >> at org.apache.spark.rdd.RDD.map(RDD.scala:365) >> at abc.Abc$.main(abc.scala:395) >> at abc.Abc.main(abc.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy >> $SparkSubmit$$runMain(SparkSubmit.scala:729) >> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit >> .scala:185) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> >
Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS
This might be caused by a few large Map objects that Spark is trying to serialize. These are not broadcast variables or anything, they're just regular objects. Would it help if I further indexed these maps into a two-level Map i.e. Map[String, Map[String, Int]] ? Or would this still count against me? What if I manually split them up into numerous Map variables? On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > I got this OOM error in Spark local mode. The error seems to have been at > the start of a stage (all of the stages on the UI showed as complete, there > were more stages to do but had not showed up on the UI yet). > > There appears to be ~100G of free memory at the time of the error. > > Spark 2.0.0 > 200G driver memory > local[30] > 8 /mntX/tmp directories for spark.local.dir > "spark.sql.shuffle.partitions", "500" > "spark.driver.maxResultSize","500" > "spark.default.parallelism", "1000" > > The line number for the error is at an RDD map operation where there are > some potentially large Map objects that are going to be accessed by each > record. Does it matter if they are broadcast variables or not? I imagine > not because its in local mode they should be available in memory to every > executor/core. > > Possibly related: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark- > ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html > > Exception in thread "main" java.lang.OutOfMemoryError > at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java: > 123) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > at java.io.ByteArrayOutputStream.ensureCapacity( > ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at java.io.ObjectOutputStream$BlockDataOutputStream.drain( > ObjectOutputStream.java:1877) > at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode( > ObjectOutputStream.java:1786) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at org.apache.spark.serializer.JavaSerializationStream. > writeObject(JavaSerializer.scala:43) > at org.apache.spark.serializer.JavaSerializerInstance. > serialize(JavaSerializer.scala:100) > at org.apache.spark.util.ClosureCleaner$.ensureSerializable( > ClosureCleaner.scala:295) > at org.apache.spark.util.ClosureCleaner$.org$apache$ > spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:151) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) > at org.apache.spark.rdd.RDD.map(RDD.scala:365) > at abc.Abc$.main(abc.scala:395) > at abc.Abc.main(abc.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$ > deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > >
Spark 2.0.0 OOM error at beginning of RDD map on AWS
I got this OOM error in Spark local mode. The error seems to have been at the start of a stage (all of the stages on the UI showed as complete, there were more stages to do but had not showed up on the UI yet). There appears to be ~100G of free memory at the time of the error. Spark 2.0.0 200G driver memory local[30] 8 /mntX/tmp directories for spark.local.dir "spark.sql.shuffle.partitions", "500" "spark.driver.maxResultSize","500" "spark.default.parallelism", "1000" The line number for the error is at an RDD map operation where there are some potentially large Map objects that are going to be accessed by each record. Does it matter if they are broadcast variables or not? I imagine not because its in local mode they should be available in memory to every executor/core. Possibly related: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html Exception in thread "main" java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.map(RDD.scala:365) at abc.Abc$.main(abc.scala:395) at abc.Abc.main(abc.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: groupByKey() compile error after upgrading from 1.6.2 to 2.0.0
Thanks, pair_rdd.rdd.groupByKey() did the trick. On Wed, Aug 10, 2016 at 8:24 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > So it looks like (despite the name) pair_rdd is actually a Dataset - my > guess is you might have a map on a dataset up above which used to return an > RDD but now returns another dataset or an unexpected implicit conversion. > Just add rdd() before the groupByKey call to push it into an RDD. That > being said - groupByKey generally is an anti-pattern so please be careful > with it. > > On Wed, Aug 10, 2016 at 8:07 PM, Arun Luthra <arun.lut...@gmail.com> > wrote: > >> Here is the offending line: >> >> val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter: >> Iterable[MyData]) => { >> ... >> >> >> [error] .scala:249: overloaded method value groupByKey with >> alternatives: >> [error] [K](func: >> org.apache.spark.api.java.function.MapFunction[(aaa.MyKey, >> aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[K >> ])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)] >> >> [error] [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4: >> org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, >> aaa.MyData)] >> [error] cannot be applied to () >> [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: >> MyKey, hd_iter: Iterable[MyData]) => { >> [error] ^ >> [warn] .scala:249: non-variable type argument aaa.MyData in >> type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData]) >> is unchecked since it is eliminated by erasure >> [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: >> MyKey, hd_iter: Iterable[MyData]) => { >> [warn] >> ^ >> [warn] one warning found >> >> >> I can't see any obvious API change... what is the problem? >> >> Thanks, >> Arun >> > > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau >
groupByKey() compile error after upgrading from 1.6.2 to 2.0.0
Here is the offending line: val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter: Iterable[MyData]) => { ... [error] .scala:249: overloaded method value groupByKey with alternatives: [error] [K](func: org.apache.spark.api.java.function.MapFunction[(aaa.MyKey, aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)] [error] [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4: org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)] [error] cannot be applied to () [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, hd_iter: Iterable[MyData]) => { [error] ^ [warn] .scala:249: non-variable type argument aaa.MyData in type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData]) is unchecked since it is eliminated by erasure [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, hd_iter: Iterable[MyData]) => { [warn] ^ [warn] one warning found I can't see any obvious API change... what is the problem? Thanks, Arun
Re: TaskCommitDenied (Driver denied task commit)
Correction. I have to use spark.yarn.am.memoryOverhead because I'm in Yarn client mode. I set it to 13% of the executor memory. Also quite helpful was increasing the total overall executor memory. It will be great when tungsten enhancements make there way into RDDs. Thanks! Arun On Thu, Jan 21, 2016 at 6:19 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > Two changes I made that appear to be keeping various errors at bay: > > 1) bumped up spark.yarn.executor.memoryOverhead to 2000 in the spirit of > https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3ccacbyxkld8qasymj2ghk__vttzv4gejczcqfaw++s1d5te1d...@mail.gmail.com%3E > . Even though I couldn't find the same error in my yarn log. > > 2) very important: I ran coalesce(1000) on the RDD at the start of the > DAG. I know keeping the # of partitions lower is helpful, based on past > experience with groupByKey. I haven't run this pipeline in a bit so that > rule of thumb was not forefront in my mind. > > On Thu, Jan 21, 2016 at 5:35 PM, Arun Luthra <arun.lut...@gmail.com> > wrote: > >> Looking into the yarn logs for a similar job where an executor was >> associated with the same error, I find: >> >> ... >> 16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive >> connection to (SERVER), creating a new one. >> 16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while >> beginning fetch of 46 outstanding blocks* >> *java.io.IOException: Failed to connect to (SERVER)* >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) >> at >> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) >> at >> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >> at >> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) >> at >> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:112) >> at >> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43) >> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> *Caused by: java.net.ConnectException: Connection refused:* (SERVER) >> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >> at >> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) >> at >> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) >> at >> io.n
MemoryStore: Not enough space to cache broadcast_N in memory
WARN MemoryStore: Not enough space to cache broadcast_4 in memory! (computed 60.2 MB so far) WARN MemoryStore: Persisting block broadcast_4 to disk instead. Can I increase the memory allocation for broadcast variables? I have a few broadcast variables that I create with sc.broadcast() . Are these labeled starting from 0 or from 1 (in reference to "broadcast_N")? I want to debug/track down which one is offending. As a feature request, it would be good if there were an optional argument (or perhaps a requireed argument) added to sc.broadcast() so that we could give it an internal label. Then it would work the same as the sc.accumulator() "name" argument. It would enable more useful warn/error messages. Arun
Re: TaskCommitDenied (Driver denied task commit)
16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/01/21 21:52:14 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. spark.yarn.driver.memoryOverhead is set but does not apply in client mode. 16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache broadcast_4 in memory! (computed 60.2 MB so far) 16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to disk instead. [Stage 1:>(2260 + 7) / 2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in stage 1.0 (TID 4530, --): TaskCommitDenied (Driver denied task commit) for job: 1, partition: 1440, attempt: 4530 [Stage 1:>(2260 + 6) / 2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in stage 1.0 (TID 4531, --): TaskCommitDenied (Driver denied task commit) for job: 1, partition: 1488, attempt: 4531 [Stage 1:>(2261 + 4) / 2262]16/01/21 21:57:39 WARN TaskSetManager: Lost task 1982.1 in stage 1.0 (TID 4532, --): TaskCommitDenied (Driver denied task commit) for job: 1, partition: 1982, attempt: 4532 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2214.0 in stage 1.0 (TID 4482, --): TaskCommitDenied (Driver denied task commit) for job: 1, partition: 2214, attempt: 4482 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 (TID 4436, --): TaskCommitDenied (Driver denied task commit) for job: 1, partition: 2168, attempt: 4436 I am running with: spark-submit --class "myclass" \ --num-executors 90 \ --driver-memory 1g \ --executor-memory 60g \ --executor-cores 8 \ --master yarn-client \ --conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ my.jar There are 2262 input files totaling just 98.6G. The DAG is basically textFile().map().filter().groupByKey().saveAsTextFile(). On Thu, Jan 21, 2016 at 2:14 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > Can you post more of your log? How big are the partitions? What is the > action you are performing? > > On Thu, Jan 21, 2016 at 2:02 PM, Arun Luthra <arun.lut...@gmail.com> > wrote: > >> Example warning: >> >> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 (TID >> 4436, XXX): TaskCommitDenied (Driver denied task commit) for job: 1, >> partition: 2168, attempt: 4436 >> >> >> Is there a solution for this? Increase driver memory? I'm using just 1G >> driver memory but ideally I won't have to increase it. >> >> The RDD being processed has 2262 partitions. >> >> Arun >> > > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau >
TaskCommitDenied (Driver denied task commit)
Example warning: 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 (TID 4436, XXX): TaskCommitDenied (Driver denied task commit) for job: 1, partition: 2168, attempt: 4436 Is there a solution for this? Increase driver memory? I'm using just 1G driver memory but ideally I won't have to increase it. The RDD being processed has 2262 partitions. Arun
Re: TaskCommitDenied (Driver denied task commit)
Usually the pipeline works, it just failed on this particular input data. The other data it has run on is of similar size. Speculation is enabled. I'm using Spark 1.5.0. Here is the config. Many of these may not be needed anymore, they are from trying to get things working in Spark 1.2 and 1.3. .set("spark.storage.memoryFraction","0.2") // default 0.6 .set("spark.shuffle.memoryFraction","0.2") // default 0.2 .set("spark.shuffle.manager","SORT") // preferred setting for optimized joins .set("spark.shuffle.consolidateFiles","true") // helpful for "too many files open" .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker errors? .set("spark.akka.frameSize","300") // helpful when using consildateFiles=true .set("spark.shuffle.compress","false") // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set("spark.file.transferTo","false") // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set("spark.core.connection.ack.wait.timeout","600") // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set("spark.speculation","true") .set("spark.worker.timeout","600") // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set("spark.akka.timeout","300") // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set("spark.storage.blockManagerSlaveTimeoutMs","12") .set("spark.driver.maxResultSize","2048") // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator","--.MyRegistrator") .set("spark.kryo.registrationRequired", "true") .set("spark.yarn.executor.memoryOverhead","600") On Thu, Jan 21, 2016 at 2:50 PM, Josh Rosen <joshro...@databricks.com> wrote: > Is speculation enabled? This TaskCommitDenied by driver error is thrown by > writers who lost the race to commit an output partition. I don't think this > had anything to do with key skew etc. Replacing the groupbykey with a count > will mask this exception because the coordination does not get triggered in > non save/write operations. > > On Thu, Jan 21, 2016 at 2:46 PM Holden Karau <hol...@pigscanfly.ca> wrote: > >> Before we dig too far into this, the thing which most quickly jumps out >> to me is groupByKey which could be causing some problems - whats the >> distribution of keys like? Try replacing the groupByKey with a count() and >> see if the pipeline works up until that stage. Also 1G of driver memory is >> also a bit small for something with 90 executors... >> >> On Thu, Jan 21, 2016 at 2:40 PM, Arun Luthra <arun.lut...@gmail.com> >> wrote: >> >>> >>> >>> 16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load native-hadoop >>> library for your platform... using builtin-java classes where applicable >>> >>> 16/01/21 21:52:14 WARN MetricsSystem: Using default name DAGScheduler >>> for source because spark.app.id is not set. >>> >>> spark.yarn.driver.memoryOverhead is set but does not apply in client >>> mode. >>> >>> 16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local >>> reads feature cannot be used because libhadoop cannot be loaded. >>> >>> 16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache >>> broadcast_4 in memory! (computed 60.2 MB so far) >>> >>> 16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to disk >>> instead. >>> >>> [Stage 1:>(2260 + 7) >>> / 2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in stage 1.0 >>> (TID 4530, --): TaskCommitDenied (Driver denied task commit) for job: 1, >>> partition: 1440, attempt: 4530 >>> >>> [Stage 1:>(2260 + 6) >>> / 2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in stage 1.0 >>> (TID 4531, --): TaskCommitDenied (Driver denied task commit) for job: 1, >>> partition: 1488, attempt: 4531 >>> >>>
Re: TaskCommitDenied (Driver denied task commit)
Looking into the yarn logs for a similar job where an executor was associated with the same error, I find: ... 16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive connection to (SERVER), creating a new one. 16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 46 outstanding blocks* *java.io.IOException: Failed to connect to (SERVER)* at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152) at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265) at org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:112) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.net.ConnectException: Connection refused:* (SERVER) at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more ... Not sure if this reveals anything at all. On Thu, Jan 21, 2016 at 2:58 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > My hunch is that the TaskCommitDenied is perhaps a red hearing and the > problem is groupByKey - but I've also just seen a lot of people be bitten > by it so that might not be issue. If you just do a count at the point of > the groupByKey does the pipeline succeed? > > On Thu, Jan 21, 2016 at 2:56 PM, Arun Luthra <arun.lut...@gmail.com> > wrote: > >> Usually the pipeline works, it just failed on this particular input data. >> The other data it has run on is of similar size. >> >> Speculation is enabled. >> >> I'm using Spark 1.5.0. >> >> Here is the config. Many of these may not be needed anymore, they are >> from trying to get things working in Spark 1.2 and 1.3. >> >> .set("spark.storage.memoryFraction","0.2") // default 0.6 >> .set("spark.shuffle.memoryFraction","0.2") // default 0.2 >> .set("spark.shuffle.manager","SORT") // preferred setting for >> optimized joins >
Re: TaskCommitDenied (Driver denied task commit)
Two changes I made that appear to be keeping various errors at bay: 1) bumped up spark.yarn.executor.memoryOverhead to 2000 in the spirit of https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3ccacbyxkld8qasymj2ghk__vttzv4gejczcqfaw++s1d5te1d...@mail.gmail.com%3E . Even though I couldn't find the same error in my yarn log. 2) very important: I ran coalesce(1000) on the RDD at the start of the DAG. I know keeping the # of partitions lower is helpful, based on past experience with groupByKey. I haven't run this pipeline in a bit so that rule of thumb was not forefront in my mind. On Thu, Jan 21, 2016 at 5:35 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > Looking into the yarn logs for a similar job where an executor was > associated with the same error, I find: > > ... > 16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive > connection to (SERVER), creating a new one. > 16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while > beginning fetch of 46 outstanding blocks* > *java.io.IOException: Failed to connect to (SERVER)* > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) > at > org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:112) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > *Caused by: java.net.ConnectException: Connection refused:* (SERVER) > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) > at > io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > ... 1 more > > ... > > > Not sure if this reveals anything at all. > > > On Thu, Jan 21, 2016 at 2:58 PM, Holden Karau <hol...@pigscanfly.ca>
groupByKey does not work?
I tried groupByKey and noticed that it did not group all values into the same group. In my test dataset (a Pair rdd) I have 16 records, where there are only 4 distinct keys, so I expected there to be 4 records in the groupByKey object, but instead there were 8. Each of the 4 distinct keys appear 2 times. Is this the expected behavior? I need to be able to get ALL values associated with each key grouped into a SINGLE record. Is it possible? Arun p.s. reducebykey will not be sufficient for me
Re: groupByKey does not work?
Spark 1.5.0 data: p1,lo1,8,0,4,0,5,20150901|5,1,1.0 p1,lo2,8,0,4,0,5,20150901|5,1,1.0 p1,lo3,8,0,4,0,5,20150901|5,1,1.0 p1,lo4,8,0,4,0,5,20150901|5,1,1.0 p1,lo1,8,0,4,0,5,20150901|5,1,1.0 p1,lo2,8,0,4,0,5,20150901|5,1,1.0 p1,lo3,8,0,4,0,5,20150901|5,1,1.0 p1,lo4,8,0,4,0,5,20150901|5,1,1.0 p1,lo1,8,0,4,0,5,20150901|5,1,1.0 p1,lo2,8,0,4,0,5,20150901|5,1,1.0 p1,lo3,8,0,4,0,5,20150901|5,1,1.0 p1,lo4,8,0,4,0,5,20150901|5,1,1.0 p1,lo1,8,0,4,0,5,20150901|5,1,1.0 p1,lo2,8,0,4,0,5,20150901|5,1,1.0 p1,lo3,8,0,4,0,5,20150901|5,1,1.0 p1,lo4,8,0,4,0,5,20150901|5,1,1.0 spark-shell: spark-shell \ --num-executors 2 \ --driver-memory 1g \ --executor-memory 10g \ --executor-cores 8 \ --master yarn-client case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char, f4:Char, f5:Char, f6:String) case class Myvalue(count1:Long, count2:Long, num:Double) val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => { val spl = line.split("\\|", -1) val k = spl(0).split(",") val v = spl(1).split(",") (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar, k(5)(0).toChar, k(6)(0).toChar, k(7)), Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble) ) }} myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1) }.collect().foreach(println) (Mykey(p1,lo1,8,0,4,0,5,20150901),1) (Mykey(p1,lo1,8,0,4,0,5,20150901),1) (Mykey(p1,lo3,8,0,4,0,5,20150901),1) (Mykey(p1,lo3,8,0,4,0,5,20150901),1) (Mykey(p1,lo4,8,0,4,0,5,20150901),1) (Mykey(p1,lo4,8,0,4,0,5,20150901),1) (Mykey(p1,lo2,8,0,4,0,5,20150901),1) (Mykey(p1,lo2,8,0,4,0,5,20150901),1) You can see that each key is repeated 2 times but each key should only appear once. Arun On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Can you give a bit more information ? > > Release of Spark you're using > Minimal dataset that shows the problem > > Cheers > > On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > >> I tried groupByKey and noticed that it did not group all values into the >> same group. >> >> In my test dataset (a Pair rdd) I have 16 records, where there are only 4 >> distinct keys, so I expected there to be 4 records in the groupByKey >> object, but instead there were 8. Each of the 4 distinct keys appear 2 >> times. >> >> Is this the expected behavior? I need to be able to get ALL values >> associated with each key grouped into a SINGLE record. Is it possible? >> >> Arun >> >> p.s. reducebykey will not be sufficient for me >> > >
Re: groupByKey does not work?
If I simplify the key to String column with values lo1, lo2, lo3, lo4, it works correctly. On Mon, Jan 4, 2016 at 4:49 PM, Daniel Imberman <daniel.imber...@gmail.com> wrote: > Could you try simplifying the key and seeing if that makes any difference? > Make it just a string or an int so we can count out any issues in object > equality. > > On Mon, Jan 4, 2016 at 4:42 PM Arun Luthra <arun.lut...@gmail.com> wrote: > >> Spark 1.5.0 >> >> data: >> >> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo1,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo2,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo3,8,0,4,0,5,20150901|5,1,1.0 >> p1,lo4,8,0,4,0,5,20150901|5,1,1.0 >> >> spark-shell: >> >> spark-shell \ >> --num-executors 2 \ >> --driver-memory 1g \ >> --executor-memory 10g \ >> --executor-cores 8 \ >> --master yarn-client >> >> >> case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char, >> f4:Char, f5:Char, f6:String) >> case class Myvalue(count1:Long, count2:Long, num:Double) >> >> val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => { >> val spl = line.split("\\|", -1) >> val k = spl(0).split(",") >> val v = spl(1).split(",") >> (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar, >> k(5)(0).toChar, k(6)(0).toChar, k(7)), >> Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble) >> ) >> }} >> >> myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1) >> }.collect().foreach(println) >> >> (Mykey(p1,lo1,8,0,4,0,5,20150901),1) >> >> (Mykey(p1,lo1,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo3,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo3,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo4,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo4,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo2,8,0,4,0,5,20150901),1) >> (Mykey(p1,lo2,8,0,4,0,5,20150901),1) >> >> >> >> You can see that each key is repeated 2 times but each key should only >> appear once. >> >> Arun >> >> On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Can you give a bit more information ? >>> >>> Release of Spark you're using >>> Minimal dataset that shows the problem >>> >>> Cheers >>> >>> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> >>> wrote: >>> >>>> I tried groupByKey and noticed that it did not group all values into >>>> the same group. >>>> >>>> In my test dataset (a Pair rdd) I have 16 records, where there are only >>>> 4 distinct keys, so I expected there to be 4 records in the groupByKey >>>> object, but instead there were 8. Each of the 4 distinct keys appear 2 >>>> times. >>>> >>>> Is this the expected behavior? I need to be able to get ALL values >>>> associated with each key grouped into a SINGLE record. Is it possible? >>>> >>>> Arun >>>> >>>> p.s. reducebykey will not be sufficient for me >>>> >>> >>> >>
types allowed for saveasobjectfile?
What types of RDD can saveAsObjectFile(path) handle? I tried a naive test with an RDD[Array[String]], but when I tried to read back the result with sc.objectFile(path).take(5).foreach(println), I got a non-promising output looking like: [Ljava.lang.String;@46123a [Ljava.lang.String;@76123b [Ljava.lang.String;@13144c [Ljava.lang.String;@75146d [Ljava.lang.String;@79118f Arun
Re: types allowed for saveasobjectfile?
Ah, yes, that did the trick. So more generally, can this handle any serializable object? On Thu, Aug 27, 2015 at 2:11 PM, Jonathan Coveney jcove...@gmail.com wrote: array[String] doesn't pretty print by default. Use .mkString(,) for example El jueves, 27 de agosto de 2015, Arun Luthra arun.lut...@gmail.com escribió: What types of RDD can saveAsObjectFile(path) handle? I tried a naive test with an RDD[Array[String]], but when I tried to read back the result with sc.objectFile(path).take(5).foreach(println), I got a non-promising output looking like: [Ljava.lang.String;@46123a [Ljava.lang.String;@76123b [Ljava.lang.String;@13144c [Ljava.lang.String;@75146d [Ljava.lang.String;@79118f Arun
How to ignore features in mllib
Is it possible to ignore features in mllib? In other words, I would like to have some 'pass-through' data, Strings for example, attached to training examples and test data. A related stackoverflow question: http://stackoverflow.com/questions/30739283/spark-mllib-how-to-ignore-features-when-training-a-classifier Arun
How to change hive database?
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext I'm getting org.apache.spark.sql.catalyst.analysis.NoSuchTableException from: val dataframe = hiveContext.table(other_db.mytable) Do I have to change current database to access it? Is it possible to do this? I'm guessing that the database.table syntax that I used in hiveContext.table is not recognized. I have no problems accessing tables in the database called default. I can list tables in other_db with hiveContext.tableNames(other_db) Using Spark 1.4.0.
Re: Spark launching without all of the requested YARN resources
Thanks Sandy et al, I will try that. I like that I can choose the minRegisteredResourcesRatio. On Wed, Jun 24, 2015 at 11:04 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Arun, You can achieve this by setting spark.scheduler.maxRegisteredResourcesWaitingTime to some really high number and spark.scheduler.minRegisteredResourcesRatio to 1.0. -Sandy On Wed, Jun 24, 2015 at 2:21 AM, Steve Loughran ste...@hortonworks.com wrote: On 24 Jun 2015, at 05:55, canan chen ccn...@gmail.com wrote: Why do you want it start until all the resources are ready ? Make it start as early as possible should make it complete earlier and increase the utilization of resources On Tue, Jun 23, 2015 at 10:34 PM, Arun Luthra arun.lut...@gmail.com wrote: Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark (via spark-submit) will begin its processing even though it apparently did not get all of the requested resources; it is running very slowly. Is there a way to force Spark/YARN to only begin when it has the full set of resources that I request? Thanks, Arun The wait until there's space launch policy is known as Gang Scheduling, https://issues.apache.org/jira/browse/YARN-624 covers what would be needed there. 1. It's not in YARN 2. For analytics workloads, it's not clear you benefit. You would wait a very long time(*) for the requirements to be satisfied. The current YARN scheduling and placement algorithms assume that you'd prefer timely container launch to extended wait for containers in the right place, and expects algorithms to work in a degraded form with a reduced no. of workers 3. Where it really matters is long-lived applications where you need some quorum of container-hosted processes, or if performance collapses utterly below a threshold. Things like HBase on YARN are an example —but Spark streaming could be another. In the absence of YARN support, it can be implemented in the application by having theYARN-hosted application (here: Spark) get the containers, start up a process on each one, but not actually start accepting/performing work until a threshold of containers is reached/some timeout has occurred. If you wanted to do that in spark, you could raise the idea on the spark dev lists and see what people think. -Steve (*) i.e. forever
Spark launching without all of the requested YARN resources
Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark (via spark-submit) will begin its processing even though it apparently did not get all of the requested resources; it is running very slowly. Is there a way to force Spark/YARN to only begin when it has the full set of resources that I request? Thanks, Arun
Missing values support in Mllib yet?
Hi, Is there any support for handling missing values in mllib yet, especially for decision trees where this is a natural feature? Arun
Re: Problem getting program to run on 15TB input
I found that the problem was due to garbage collection in filter(). Using Hive to do the filter solved the problem. A lot of other problems went away when I upgraded to Spark 1.2.0, which compresses various task overhead data (HighlyCompressedMapStatus etc.). It has been running very very smoothly with these two changes. I'm fairly sure that I tried coalesce(), it resulted into tasks that were too big, the code has evolved too much to easily double check it now. On Sat, Jun 6, 2015 at 12:50 AM, Kapil Malik kma...@adobe.com wrote: Very interesting and relevant thread for production level usage of spark. @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase? Thanks, Kapil Malik | kma...@adobe.com | 33430 / 8800836581 *From:* Daniel Mahler [mailto:dmah...@gmail.com] *Sent:* 13 April 2015 15:42 *To:* Arun Luthra *Cc:* Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org *Subject:* Re: Problem getting program to run on 15TB input Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help. On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote: Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote: I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, real=0.02 secs] 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, real=0.04 secs] 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, real=0.08 secs] 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, real=0.02 secs] 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, real=0.02 secs] 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, real=0.02 secs] So ~9GB is getting GC'ed every few seconds. Which seems like a lot. Question: The filter() is removing 99% of the data. Does this 99% of the data get GC'ed? Now, I was able to finally get to reduceByKey() by reducing the number of executor-cores (to 2), based on suggestions at http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html . This makes everything before reduceByKey() run pretty smoothly. I ran this with more executor-memory and less executors (most important thing was fewer executor-cores): --num-executors 150 \ --driver-memory 15g \ --executor-memory 110g \ --executor-cores 32 \ But then, reduceByKey() fails with: java.lang.OutOfMemoryError: Java heap space On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com wrote: The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post
Re: Efficient saveAsTextFile by key, directory for each key?
I ended up post-processing the result in hive with a dynamic partition insert query to get a table partitioned by the key. Looking further, it seems that 'dynamic partition' insert is in Spark SQL and working well in Spark SQL versions 1.2.0: https://issues.apache.org/jira/browse/SPARK-3007 On Tue, Apr 21, 2015 at 5:45 PM, Arun Luthra arun.lut...@gmail.com wrote: Is there an efficient way to save an RDD with saveAsTextFile in such a way that the data gets shuffled into separated directories according to a key? (My end goal is to wrap the result in a multi-partitioned Hive table) Suppose you have: case class MyData(val0: Int, val1: string, directory_name: String) and an RDD called myrdd with type RDD[MyData]. Suppose that you already have an array of the distinct directory_name's, called distinct_directories. A very inefficient way to to this is: distinct_directories.foreach( dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name ) .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,)) .coalesce(5) .saveAsTextFile(base_dir_name/ + f$dir_name) ) I tried this solution, and it does not do the multiple myrdd.filter's in parallel. I'm guessing partitionBy might be in the efficient solution if an easy efficient solution exists... Thanks, Arun
Efficient saveAsTextFile by key, directory for each key?
Is there an efficient way to save an RDD with saveAsTextFile in such a way that the data gets shuffled into separated directories according to a key? (My end goal is to wrap the result in a multi-partitioned Hive table) Suppose you have: case class MyData(val0: Int, val1: string, directory_name: String) and an RDD called myrdd with type RDD[MyData]. Suppose that you already have an array of the distinct directory_name's, called distinct_directories. A very inefficient way to to this is: distinct_directories.foreach( dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name ) .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,)) .coalesce(5) .saveAsTextFile(base_dir_name/ + f$dir_name) ) I tried this solution, and it does not do the multiple myrdd.filter's in parallel. I'm guessing partitionBy might be in the efficient solution if an easy efficient solution exists... Thanks, Arun
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
I'm using a pre-built Spark; I'm not trying to compile Spark. The compile error appears when I try to register HighlyCompressedMapStatus in my program: kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) If I don't register it, I get a runtime error saying that it needs to be registered (the error is only when I turn on kryo). However the code is running smoothly with kryo turned off. On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com wrote: I'm not sure what you mean. Are you asking how you can recompile all of spark and deploy it, instead of using one of the pre-built versions? https://spark.apache.org/docs/latest/building-spark.html Or are you seeing compile problems specifically w/ HighlyCompressedMapStatus? The code compiles fine, so I'm not sure what problem you are running into -- we'd need a lot more info to help On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com wrote: Does anyone know how to get the HighlyCompressedMapStatus to compile? I will try turning off kryo in 1.2.0 and hope things don't break. I want to benefit from the MapOutputTracker fix in 1.2.0. On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote: the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
The error is in the original post. Here's the recipe that worked for me: kryo.register(Class.forName(org.roaringbitmap.RoaringArray$Element)) kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus)) So your Class.forName workaround worked, thanks! On Thu, Mar 12, 2015 at 10:56 AM, Imran Rashid iras...@cloudera.com wrote: Giving a bit more detail on the error would make it a lot easier for others to help you out. Eg., in this case, it would have helped if included your actual compile error. In any case, I'm assuming your issue is b/c that class if private to spark. You can sneak around that by using Class.forName(stringOfClassName) instead: scala classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus] console:8: error: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus] ^ scala Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus) res1: Class[_] = class org.apache.spark.scheduler.HighlyCompressedMapStatus hope this helps, Imran On Thu, Mar 12, 2015 at 12:47 PM, Arun Luthra arun.lut...@gmail.com wrote: I'm using a pre-built Spark; I'm not trying to compile Spark. The compile error appears when I try to register HighlyCompressedMapStatus in my program: kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) If I don't register it, I get a runtime error saying that it needs to be registered (the error is only when I turn on kryo). However the code is running smoothly with kryo turned off. On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com wrote: I'm not sure what you mean. Are you asking how you can recompile all of spark and deploy it, instead of using one of the pre-built versions? https://spark.apache.org/docs/latest/building-spark.html Or are you seeing compile problems specifically w/ HighlyCompressedMapStatus? The code compiles fine, so I'm not sure what problem you are running into -- we'd need a lot more info to help On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com wrote: Does anyone know how to get the HighlyCompressedMapStatus to compile? I will try turning off kryo in 1.2.0 and hope things don't break. I want to benefit from the MapOutputTracker fix in 1.2.0. On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote: the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
Does anyone know how to get the HighlyCompressedMapStatus to compile? I will try turning off kryo in 1.2.0 and hope things don't break. I want to benefit from the MapOutputTracker fix in 1.2.0. On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote: the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: Problem getting program to run on 15TB input
Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote: I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, real=0.02 secs] 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, real=0.04 secs] 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, real=0.08 secs] 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, real=0.02 secs] 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, real=0.02 secs] 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, real=0.02 secs] So ~9GB is getting GC'ed every few seconds. Which seems like a lot. Question: The filter() is removing 99% of the data. Does this 99% of the data get GC'ed? Now, I was able to finally get to reduceByKey() by reducing the number of executor-cores (to 2), based on suggestions at http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html . This makes everything before reduceByKey() run pretty smoothly. I ran this with more executor-memory and less executors (most important thing was fewer executor-cores): --num-executors 150 \ --driver-memory 15g \ --executor-memory 110g \ --executor-cores 32 \ But then, reduceByKey() fails with: java.lang.OutOfMemoryError: Java heap space On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com wrote: The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: Problem getting program to run on 15TB input
I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, real=0.02 secs] 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, real=0.04 secs] 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, real=0.08 secs] 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, real=0.02 secs] 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, real=0.02 secs] 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, real=0.02 secs] So ~9GB is getting GC'ed every few seconds. Which seems like a lot. Question: The filter() is removing 99% of the data. Does this 99% of the data get GC'ed? Now, I was able to finally get to reduceByKey() by reducing the number of executor-cores (to 2), based on suggestions at http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html . This makes everything before reduceByKey() run pretty smoothly. I ran this with more executor-memory and less executors (most important thing was fewer executor-cores): --num-executors 150 \ --driver-memory 15g \ --executor-memory 110g \ --executor-cores 32 \ But then, reduceByKey() fails with: java.lang.OutOfMemoryError: Java heap space On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com wrote: The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because
Re: Problem getting program to run on 15TB input
A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with: --num-executors 500 \ --driver-memory 20g \ --executor-memory 20g \ --executor-cores 32 \ I'm using kryo serialization on everything, including broadcast variables. Spark creates 145k tasks, and the first stage includes everything before groupByKey(). It fails before getting to groupByKey. I have
Re: Problem getting program to run on 15TB input
So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with: --num-executors 500 \ --driver-memory 20g \ --executor-memory 20g \ --executor-cores 32 \ I'm using kryo serialization on everything, including broadcast variables. Spark creates 145k tasks, and the first stage includes everything before groupByKey(). It fails before getting to groupByKey. I have tried doubling and tripling the number of partitions when calling textFile, with no success. Very similar code (trivial changes, to accomodate different input) worked on a smaller input (~8TB)... Not that it was easy to get that working. Errors vary, here is what I am getting right now: ERROR SendingConnection: Exception while reading SendingConnection ... java.nio.channels.ClosedChannelException (^ guessing that is symptom of something else) WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms (^ guessing that is symptom of something else) ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting down ActorSystem [sparkDriver] *java.lang.OutOfMemoryError: GC overhead limit exceeded* Other times I will get messages about executor lost... about 1 message per second, after ~~50k tasks complete, until there are almost no executors left and progress slows to nothing. I ran with verbose GC info; I do see failing yarn containers that have multiple (like 30) Full GC messages but I don't know how to interpret if that is the problem. Typical Full GC time taken seems ok: [Times: user=23.30 sys=0.06, real=1.94 secs] Suggestions, please? Huge thanks for useful suggestions, Arun
Re: Problem getting program to run on 15TB input
The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with: --num-executors 500 \ --driver-memory 20g \ --executor-memory 20g \ --executor-cores 32 \ I'm using kryo serialization on everything, including broadcast variables. Spark creates 145k tasks, and the first stage includes everything before groupByKey(). It fails before getting to groupByKey. I have tried doubling and tripling the number of partitions when calling textFile, with no success. Very similar code (trivial changes, to accomodate different input) worked on a smaller input (~8TB)... Not that it was easy to get that working. Errors vary, here is what I am getting right now: ERROR SendingConnection
Re: Problem getting program to run on 15TB input
The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true
Problem getting program to run on 15TB input
My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator) .set(spark.kryo.registrationRequired, true) val rdd1 = sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) val rdd2 = sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...) rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() I run the code with: --num-executors 500 \ --driver-memory 20g \ --executor-memory 20g \ --executor-cores 32 \ I'm using kryo serialization on everything, including broadcast variables. Spark creates 145k tasks, and the first stage includes everything before groupByKey(). It fails before getting to groupByKey. I have tried doubling and tripling the number of partitions when calling textFile, with no success. Very similar code (trivial changes, to accomodate different input) worked on a smaller input (~8TB)... Not that it was easy to get that working. Errors vary, here is what I am getting right now: ERROR SendingConnection: Exception while reading SendingConnection ... java.nio.channels.ClosedChannelException (^ guessing that is symptom of something else) WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms (^ guessing that is symptom of something else) ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting down ActorSystem [sparkDriver] *java.lang.OutOfMemoryError: GC overhead limit exceeded* Other times I will get messages about executor lost... about 1 message per second, after ~~50k tasks complete, until there are almost no executors left and progress slows to nothing. I ran with verbose GC info; I do see failing yarn containers that have multiple (like 30) Full GC messages but I don't know how to interpret if that is the problem. Typical Full GC time taken seems ok: [Times: user=23.30 sys=0.06, real=1.94 secs] Suggestions, please? Huge thanks for useful suggestions, Arun
Workaround for spark 1.2.X roaringbitmap kryo problem?
Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Open file limit settings for Spark on Yarn job
Hi, I'm running Spark on Yarn from an edge node, and the tasks on the run Data Nodes. My job fails with the Too many open files error once it gets to groupByKey(). Alternatively I can make it fail immediately if I repartition the data when I create the RDD. Where do I need to make sure that ulimit -n is high enough? On the edge node it is small, 1024, but on the data nodes, the yarn user has a high limit, 32k. But is the yarn user the relevant user? And, is the 1024 limit for myself on the edge node a problem or is that limit not relevant? Arun
Re: Spark job ends abruptly during setup without error message
I'm submitting this on a cluster, with my usual setting of, export YARN_CONF_DIR=/etc/hadoop/conf It is working again after a small change to the code so I will see if I can reproduce the error (later today). On Thu, Feb 5, 2015 at 9:17 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Are you submitting the job from your local machine or on the driver machine.? Have you set YARN_CONF_DIR. On Thu, Feb 5, 2015 at 10:43 PM, Arun Luthra arun.lut...@gmail.com wrote: While a spark-submit job is setting up, the yarnAppState goes into Running mode, then I get a flurry of typical looking INFO-level messages such as INFO BlockManagerMasterActor: ... INFO YarnClientSchedulerBackend: Registered executor: ... Then, spark-submit quits without any error message and I'm back at the command line. What could be causing this? Arun -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Spark job ends abruptly during setup without error message
While a spark-submit job is setting up, the yarnAppState goes into Running mode, then I get a flurry of typical looking INFO-level messages such as INFO BlockManagerMasterActor: ... INFO YarnClientSchedulerBackend: Registered executor: ... Then, spark-submit quits without any error message and I'm back at the command line. What could be causing this? Arun
Re: SQL query in scala API
Thanks, I will try this. On Fri, Dec 5, 2014 at 1:19 AM, Cheng Lian lian.cs@gmail.com wrote: Oh, sorry. So neither SQL nor Spark SQL is preferred. Then you may write you own aggregation with aggregateByKey: users.aggregateByKey((0, Set.empty[String]))({ case ((count, seen), user) = (count + 1, seen + user) }, { case ((count0, seen0), (count1, seen1)) = (count0 + count1, seen0 ++ seen1) }).mapValues { case (count, seen) = (count, seen.size) } On 12/5/14 3:47 AM, Arun Luthra wrote: Is that Spark SQL? I'm wondering if it's possible without spark SQL. On Wed, Dec 3, 2014 at 8:08 PM, Cheng Lian lian.cs@gmail.com wrote: You may do this: table(users).groupBy('zip)('zip, count('user), countDistinct('user)) On 12/4/14 8:47 AM, Arun Luthra wrote: I'm wondering how to do this kind of SQL query with PairRDDFunctions. SELECT zip, COUNT(user), COUNT(DISTINCT user) FROM users GROUP BY zip In the Spark scala API, I can make an RDD (called users) of key-value pairs where the keys are zip (as in ZIP code) and the values are user id's. Then I can compute the count and distinct count like this: val count = users.mapValues(_ = 1).reduceByKey(_ + _) val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _) Then, if I want count and countDistinct in the same table, I have to join them on the key. Is there a way to do this without doing a join (and without using SQL or spark SQL)? Arun
Re: SQL query in scala API
Is that Spark SQL? I'm wondering if it's possible without spark SQL. On Wed, Dec 3, 2014 at 8:08 PM, Cheng Lian lian.cs@gmail.com wrote: You may do this: table(users).groupBy('zip)('zip, count('user), countDistinct('user)) On 12/4/14 8:47 AM, Arun Luthra wrote: I'm wondering how to do this kind of SQL query with PairRDDFunctions. SELECT zip, COUNT(user), COUNT(DISTINCT user) FROM users GROUP BY zip In the Spark scala API, I can make an RDD (called users) of key-value pairs where the keys are zip (as in ZIP code) and the values are user id's. Then I can compute the count and distinct count like this: val count = users.mapValues(_ = 1).reduceByKey(_ + _) val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _) Then, if I want count and countDistinct in the same table, I have to join them on the key. Is there a way to do this without doing a join (and without using SQL or spark SQL)? Arun
SQL query in scala API
I'm wondering how to do this kind of SQL query with PairRDDFunctions. SELECT zip, COUNT(user), COUNT(DISTINCT user) FROM users GROUP BY zip In the Spark scala API, I can make an RDD (called users) of key-value pairs where the keys are zip (as in ZIP code) and the values are user id's. Then I can compute the count and distinct count like this: val count = users.mapValues(_ = 1).reduceByKey(_ + _) val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _) Then, if I want count and countDistinct in the same table, I have to join them on the key. Is there a way to do this without doing a join (and without using SQL or spark SQL)? Arun
Re: rack-topology.sh no such file or directory
Problem was solved by having the admins put this file on the edge nodes. Thanks, Arun On Wed, Nov 19, 2014 at 12:27 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Your Hadoop configuration is set to look for this file to determine racks. Is the file present on cluster nodes? If not, look at your hdfs-site.xml and remove the setting for a rack topology script there (or it might be in core-site.xml). Matei On Nov 19, 2014, at 12:13 PM, Arun Luthra arun.lut...@gmail.com wrote: I'm trying to run Spark on Yarn on a hortonworks 2.1.5 cluster. I'm getting this error: 14/11/19 13:46:34 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@#/user/Executor#- 2027837001] with ID 42 14/11/19 13:46:34 WARN net.ScriptBasedMapping: Exception running /etc/hadoop/conf/rack-topology.sh 10.0.28.130 java.io.IOException: Cannot run program /etc/hadoop/conf/rack-topology.sh (in directory ###): error=2, No such file or directory The rack-topology script is not on system (find / 2/dev/null -name rack-topology). Any possibly solution? Arun Luthra
rack-topology.sh no such file or directory
I'm trying to run Spark on Yarn on a hortonworks 2.1.5 cluster. I'm getting this error: 14/11/19 13:46:34 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@#/user/Executor#- 2027837001] with ID 42 14/11/19 13:46:34 WARN net.ScriptBasedMapping: Exception running /etc/hadoop/conf/rack-topology.sh 10.0.28.130 java.io.IOException: Cannot run program /etc/hadoop/conf/rack-topology.sh (in directory ###): error=2, No such file or directory The rack-topology script is not on system (find / 2/dev/null -name rack-topology). Any possibly solution? Arun Luthra
How to configure build.sbt for Spark 1.2.0
I built Spark 1.2.0 succesfully, but was unable to build my Spark program under 1.2.0 with sbt assembly my build.sbt file. It contains: I tried: org.apache.spark %% spark-sql % 1.2.0, org.apache.spark %% spark-core % 1.2.0, and org.apache.spark %% spark-sql % 1.2.0-SNAPSHOT, org.apache.spark %% spark-core % 1.2.0-SNAPSHOT, but I get errors like: [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.apache.spark#spark-sql_2.10;1.2.0: not found [warn] :: org.apache.spark#spark-core_2.10;1.2.0: not found [warn] :: sbt.ResolveException: unresolved dependency: org.apache.spark#spark-sql_2.10;1.2.0: not found unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found ... [error] (*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-sql_2.10;1.2.0: not found [error] unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found Do I need to configure my build.sbt to point to my local spark 1.2.0 repository? How? Thanks, - Arun
Re: How to configure build.sbt for Spark 1.2.0
Hi Pat, Couple of points: 1) I must have done something naive like: git clone git://github.com/apache/spark.git -b branch-1.2.0 because git branch is telling me I'm on the master branch, and I see that branch-1.2.0 doesn't exist (https://github.com/apache/spark). Nevertheless, when I compiled this master branch spark shell tells me I have 1.2.0. So I guess the master is currently 1.2.0... 2) The README on the master branch only has build instructions for maven. I built Spark successfully with mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package and it looks like the publish local solution for maven is: mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean install I will report back with the result. On Wed, Oct 8, 2014 at 5:50 PM, Pat McDonough pat.mcdono...@databricks.com wrote: Hey Arun, Since this build depends on unpublished builds of spark (1.2.0-SNAPSHOT), you'll need to first build spark and publish-local so your application build can find those SNAPSHOTs in your local repo. Just append publish-local to your sbt command where you build Spark. -Pat On Wed, Oct 8, 2014 at 5:35 PM, Arun Luthra arun.lut...@gmail.com wrote: I built Spark 1.2.0 succesfully, but was unable to build my Spark program under 1.2.0 with sbt assembly my build.sbt file. It contains: I tried: org.apache.spark %% spark-sql % 1.2.0, org.apache.spark %% spark-core % 1.2.0, and org.apache.spark %% spark-sql % 1.2.0-SNAPSHOT, org.apache.spark %% spark-core % 1.2.0-SNAPSHOT, but I get errors like: [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.apache.spark#spark-sql_2.10;1.2.0: not found [warn] :: org.apache.spark#spark-core_2.10;1.2.0: not found [warn] :: sbt.ResolveException: unresolved dependency: org.apache.spark#spark-sql_2.10;1.2.0: not found unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found ... [error] (*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-sql_2.10;1.2.0: not found [error] unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found Do I need to configure my build.sbt to point to my local spark 1.2.0 repository? How? Thanks, - Arun
Re: PrintWriter error in foreach
Ok, so I don't think the workers on the data nodes will be able to see my output directory on the edge node. I don't think stdout will work either, so I'll write to HDFS via rdd.saveAsTextFile(...) On Wed, Sep 10, 2014 at 3:51 PM, Daniil Osipov daniil.osi...@shazam.com wrote: Try providing full path to the file you want to write, and make sure the directory exists and is writable by the Spark process. On Wed, Sep 10, 2014 at 3:46 PM, Arun Luthra arun.lut...@gmail.com wrote: I have a spark program that worked in local mode, but throws an error in yarn-client mode on a cluster. On the edge node in my home directory, I have an output directory (called transout) which is ready to receive files. The spark job I'm running is supposed to write a few hundred files into that directory, once for each iteration of a foreach function. This works in local mode, and my only guess as to why this would fail in yarn-client mode is that the RDD is distributed across many nodes and the program is trying to use the PrintWriter on the datanodes, where the output directory doesn't exist. Is this what's happening? Any proposed solution? abbreviation of the code: import java.io.PrintWriter ... rdd.foreach { val outFile = new PrintWriter(transoutput/output.%s.format(id)) outFile.println(test) outFile.close() } Error: 14/09/10 16:57:09 WARN TaskSetManager: Lost TID 1826 (task 0.0:26) 14/09/10 16:57:09 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: transoutput/input.598718 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:194) at java.io.FileOutputStream.init(FileOutputStream.java:84) at java.io.PrintWriter.init(PrintWriter.java:146) at com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:98) at com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)