Re: Large number of conf broadcasts
Awesome, thanks for the PR Koert! /Anders On Thu, Dec 17, 2015 at 10:22 PM Prasad Ravilla <pras...@slalom.com> wrote: > Thanks, Koert. > > Regards, > Prasad. > > From: Koert Kuipers > Date: Thursday, December 17, 2015 at 1:06 PM > To: Prasad Ravilla > Cc: Anders Arpteg, user > > Subject: Re: Large number of conf broadcasts > > https://github.com/databricks/spark-avro/pull/95 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_databricks_spark-2Davro_pull_95=CwMFaQ=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw=9AjxHvmieZttugnxWogbT7lOTg1hVM6cMVLj6tfukY4=mDYfa3wyqnL6HBitNnJzuriOYqY5e8l7cgMnUgjx96s=> > > On Thu, Dec 17, 2015 at 3:35 PM, Prasad Ravilla <pras...@slalom.com> > wrote: > >> Hi Anders, >> >> I am running into the same issue as yours. I am trying to read about 120 >> thousand avro files into a single data frame. >> >> Is your patch part of a pull request from the master branch in github? >> >> Thanks, >> Prasad. >> >> From: Anders Arpteg >> Date: Thursday, October 22, 2015 at 10:37 AM >> To: Koert Kuipers >> Cc: user >> Subject: Re: Large number of conf broadcasts >> >> Yes, seems unnecessary. I actually tried patching the >> com.databricks.spark.avro reader to only broadcast once per dataset, >> instead of every single file/partition. It seems to work just as fine, and >> there are significantly less broadcasts and not seeing out of memory issues >> any more. Strange that more people does not react to this, since the >> broadcasting seems completely unnecessary... >> >> Best, >> Anders >> >> On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers <ko...@tresata.com> wrote: >> >>> i am seeing the same thing. its gona completely crazy creating >>> broadcasts for the last 15 mins or so. killing it... >>> >>> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg <arp...@spotify.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Running spark 1.5.0 in yarn-client mode, and am curios in why there are >>>> so many broadcast being done when loading datasets with large number of >>>> partitions/files. Have datasets with thousands of partitions, i.e. hdfs >>>> files in the avro folder, and sometime loading hundreds of these large >>>> datasets. Believe I have located the broadcast to line >>>> SparkContext.scala:1006. It seems to just broadcast the hadoop >>>> configuration, and I don't see why it should be necessary to broadcast that >>>> for EVERY file? Wouldn't it be possible to reuse the same broadcast >>>> configuration? It hardly the case the the configuration would be different >>>> between each file in a single dataset. Seems to be wasting lots of memory >>>> and needs to persist unnecessarily to disk (see below again). >>>> >>>> Thanks, >>>> Anders >>>> >>>> 15/09/24 17:11:11 INFO BlockManager: Writing block >>>> broadcast_1871_piece0 to disk >>>> [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added >>>> broadcast_1871_piece0 on disk on 10.254.35.24:49428 >>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__10.254.35.24-3A49428=AAMFaQ=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw=l2yANY7xVKKwiFwzeDzKhyU0PGja-46MWiTFMCmhYH8=JWqID_Bk5XTujNC34_AAgssnJp-X3ocZ79BgAwGOLbQ=> >>>> (size: 23.1 KB) >>>> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored >>>> as bytes in memory (estimated size 23.1 KB, free 2.4 KB) >>>> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in >>>> memory on 10.254.35.24:49428 >>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__10.254.35.24-3A49428=AAMFaQ=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw=l2yANY7xVKKwiFwzeDzKhyU0PGja-46MWiTFMCmhYH8=JWqID_Bk5XTujNC34_AAgssnJp-X3ocZ79BgAwGOLbQ=> >>>> (size: 23.1 KB, free: 464.0 MB) >>>> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from >>>> hadoopFile at AvroRelation.scala:121 >>>> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory >>>> threshold of 1024.0 KB for computing block broadcast_4804 in memory >>>> . >>>> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache >>>> broadcast_4804 in memory! (computed 496.0 B so far) >>>> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + >>>> 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage >>>> limit = 530.3 MB. >>>> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to >>>> disk instead. >>>> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with >>>> curMem=556036460, maxMem=556038881 >>>> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping >>>> 15/09/24 17:11:11 INFO BlockManager: Dropping block >>>> broadcast_1872_piece0 from memory >>>> 15/09/24 17:11:11 INFO BlockManager: Writing block >>>> broadcast_1872_piece0 to disk >>>> >>>> >>> >>> >
Re: Large number of conf broadcasts
Nice Koert, lets hope it gets merged soon. /Anders On Fri, Oct 23, 2015 at 6:32 PM Koert Kuipers <ko...@tresata.com> wrote: > https://github.com/databricks/spark-avro/pull/95 > > On Fri, Oct 23, 2015 at 5:01 AM, Koert Kuipers <ko...@tresata.com> wrote: > >> oh no wonder... it undoes the glob (i was reading from /some/path/*), >> creates a hadoopRdd for every path, and then creates a union of them using >> UnionRDD. >> >> thats not what i want... no need to do union. AvroInpuFormat already has >> the ability to handle globs (or multiple paths comma separated) very >> efficiently. AvroRelation should just pass the paths (comma separated). >> >> >> >> >> On Thu, Oct 22, 2015 at 1:37 PM, Anders Arpteg <arp...@spotify.com> >> wrote: >> >>> Yes, seems unnecessary. I actually tried patching the >>> com.databricks.spark.avro reader to only broadcast once per dataset, >>> instead of every single file/partition. It seems to work just as fine, and >>> there are significantly less broadcasts and not seeing out of memory issues >>> any more. Strange that more people does not react to this, since the >>> broadcasting seems completely unnecessary... >>> >>> Best, >>> Anders >>> >>> >>> On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers <ko...@tresata.com> wrote: >>> >>>> i am seeing the same thing. its gona completely crazy creating >>>> broadcasts for the last 15 mins or so. killing it... >>>> >>>> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg <arp...@spotify.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Running spark 1.5.0 in yarn-client mode, and am curios in why there >>>>> are so many broadcast being done when loading datasets with large number >>>>> of >>>>> partitions/files. Have datasets with thousands of partitions, i.e. hdfs >>>>> files in the avro folder, and sometime loading hundreds of these large >>>>> datasets. Believe I have located the broadcast to line >>>>> SparkContext.scala:1006. It seems to just broadcast the hadoop >>>>> configuration, and I don't see why it should be necessary to broadcast >>>>> that >>>>> for EVERY file? Wouldn't it be possible to reuse the same broadcast >>>>> configuration? It hardly the case the the configuration would be different >>>>> between each file in a single dataset. Seems to be wasting lots of memory >>>>> and needs to persist unnecessarily to disk (see below again). >>>>> >>>>> Thanks, >>>>> Anders >>>>> >>>>> 15/09/24 17:11:11 INFO BlockManager: Writing block >>>>> broadcast_1871_piece0 to disk >>>>> [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added >>>>> broadcast_1871_piece0 on disk on 10.254.35.24:49428 (size: 23.1 KB) >>>>> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored >>>>> as bytes in memory (estimated size 23.1 KB, free 2.4 KB) >>>>> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 >>>>> in memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB) >>>>> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 >>>>> from hadoopFile at AvroRelation.scala:121 >>>>> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory >>>>> threshold of 1024.0 KB for computing block broadcast_4804 in memory >>>>> . >>>>> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache >>>>> broadcast_4804 in memory! (computed 496.0 B so far) >>>>> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + >>>>> 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage >>>>> limit = 530.3 MB. >>>>> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to >>>>> disk instead. >>>>> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with >>>>> curMem=556036460, maxMem=556038881 >>>>> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping >>>>> 15/09/24 17:11:11 INFO BlockManager: Dropping block >>>>> broadcast_1872_piece0 from memory >>>>> 15/09/24 17:11:11 INFO BlockManager: Writing block >>>>> broadcast_1872_piece0 to disk >>>>> >>>>> >>>> >>>> >> >
Re: Large number of conf broadcasts
Yes, seems unnecessary. I actually tried patching the com.databricks.spark.avro reader to only broadcast once per dataset, instead of every single file/partition. It seems to work just as fine, and there are significantly less broadcasts and not seeing out of memory issues any more. Strange that more people does not react to this, since the broadcasting seems completely unnecessary... Best, Anders On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers <ko...@tresata.com> wrote: > i am seeing the same thing. its gona completely crazy creating broadcasts > for the last 15 mins or so. killing it... > > On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg <arp...@spotify.com> wrote: > >> Hi, >> >> Running spark 1.5.0 in yarn-client mode, and am curios in why there are >> so many broadcast being done when loading datasets with large number of >> partitions/files. Have datasets with thousands of partitions, i.e. hdfs >> files in the avro folder, and sometime loading hundreds of these large >> datasets. Believe I have located the broadcast to line >> SparkContext.scala:1006. It seems to just broadcast the hadoop >> configuration, and I don't see why it should be necessary to broadcast that >> for EVERY file? Wouldn't it be possible to reuse the same broadcast >> configuration? It hardly the case the the configuration would be different >> between each file in a single dataset. Seems to be wasting lots of memory >> and needs to persist unnecessarily to disk (see below again). >> >> Thanks, >> Anders >> >> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0 >> to disk [19/49086]15/09/24 >> 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on >> 10.254.35.24:49428 (size: 23.1 KB) >> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as >> bytes in memory (estimated size 23.1 KB, free 2.4 KB) >> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in >> memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB) >> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from >> hadoopFile at AvroRelation.scala:121 >> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory >> threshold of 1024.0 KB for computing block broadcast_4804 in memory >> . >> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache >> broadcast_4804 in memory! (computed 496.0 B so far) >> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 >> B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage >> limit = 530.3 MB. >> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to >> disk instead. >> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with >> curMem=556036460, maxMem=556038881 >> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping >> 15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0 >> from memory >> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0 >> to disk >> >> > >
Problem with multiple fields with same name in Avro
Hi, Received the following error when reading an Avro source with Spark 1.5.0 and the com.databricks.spark.avro reader. In the data source, there is one nested field named "UserActivity.history.activity" and another named "UserActivity.activity". This seems to be the reason for the execption, since the two fields are named the same but in different levels in the hierarchy. Any ideas of how to get around this? Execption occurs directly when trying to load the data. Thanks, Anders 15/09/26 11:42:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, lon4-hadoopslave-a148.lon4.spotify.net): org.apache.avro.AvroRuntimeException: Bad in dex at com.spotify.analytics.schema.UserActivity.put(UserActivity.java:60) at org.apache.avro.generic.GenericData.setField(GenericData.java:573) at org.apache.avro.generic.GenericData.setField(GenericData.java:590) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:66) at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.isEmpty(Iterator.scala:256) at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157) at com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:127) at com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:126) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Large number of conf broadcasts
Hi, Running spark 1.5.0 in yarn-client mode, and am curios in why there are so many broadcast being done when loading datasets with large number of partitions/files. Have datasets with thousands of partitions, i.e. hdfs files in the avro folder, and sometime loading hundreds of these large datasets. Believe I have located the broadcast to line SparkContext.scala:1006. It seems to just broadcast the hadoop configuration, and I don't see why it should be necessary to broadcast that for EVERY file? Wouldn't it be possible to reuse the same broadcast configuration? It hardly the case the the configuration would be different between each file in a single dataset. Seems to be wasting lots of memory and needs to persist unnecessarily to disk (see below again). Thanks, Anders 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0 to disk [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on 10.254.35.24:49428 (size: 23.1 KB) 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as bytes in memory (estimated size 23.1 KB, free 2.4 KB) 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB) 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from hadoopFile at AvroRelation.scala:121 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_4804 in memory . 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache broadcast_4804 in memory! (computed 496.0 B so far) 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage limit = 530.3 MB. 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to disk instead. 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with curMem=556036460, maxMem=556038881 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping 15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0 from memory 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0 to disk
Re: Problems with Tungsten in Spark 1.5.0-rc2
Ok, thanks Reynold. When I tested dynamic allocation with Spark 1.4, it complained saying that it was not tungsten compliant. Lets hope it works with 1.5 then! On Tue, Sep 8, 2015 at 5:49 AM Reynold Xin <r...@databricks.com> wrote: > > On Wed, Sep 2, 2015 at 12:03 AM, Anders Arpteg <arp...@spotify.com> wrote: > >> >> BTW, is it possible (or will it be) to use Tungsten with dynamic >> allocation and the external shuffle manager? >> >> > Yes - I think this already works. There isn't anything specific here > related to Tungsten. > >
Re: Problems with Tungsten in Spark 1.5.0-rc2
I haven't done a comparative benchmarking between the two, and it would involve some work to do so. A single run with each suffler would probably not say that much since we have a rather busy cluster and the performance heavily depends on what's currently running in the cluster. I have seen less problems/exceptions though, and possibilities to decrease memory requirements (or increase cores), which is of great help. BTW, is it possible (or will it be) to use Tungsten with dynamic allocation and the external shuffle manager? Best, Anders On Tue, Sep 1, 2015 at 7:07 PM Davies Liu <dav...@databricks.com> wrote: > Thanks for the confirmation. The tungsten-sort is not the default > ShuffleManager, this fix will not block 1.5 release, it may be in > 1.5.1. > > BTW, How is the difference between sort and tungsten-sort > ShuffleManager for this large job? > > On Tue, Sep 1, 2015 at 8:03 AM, Anders Arpteg <arp...@spotify.com> wrote: > > A fix submitted less than one hour after my mail, very impressive Davies! > > I've compiled your PR and tested it with the large job that failed > before, > > and it seems to work fine now without any exceptions. Awesome, thanks! > > > > Best, > > Anders > > > > On Tue, Sep 1, 2015 at 1:38 AM Davies Liu <dav...@databricks.com> wrote: > >> > >> I had sent out a PR [1] to fix 2), could you help to test that? > >> > >> [1] https://github.com/apache/spark/pull/8543 > >> > >> On Mon, Aug 31, 2015 at 12:34 PM, Anders Arpteg <arp...@spotify.com> > >> wrote: > >> > Was trying out 1.5 rc2 and noticed some issues with the Tungsten > shuffle > >> > manager. One problem was when using the com.databricks.spark.avro > reader > >> > and > >> > the error(1) was received, see stack trace below. The problem does not > >> > occur > >> > with the "sort" shuffle manager. > >> > > >> > Another problem was in a large complex job with lots of > transformations > >> > occurring simultaneously, i.e. 50+ or more maps each shuffling data. > >> > Received error(2) about inability to acquire memory which seems to > also > >> > have > >> > to do with Tungsten. Possibly some setting available to increase that > >> > memory, because there's lots of heap memory available. > >> > > >> > Am running on Yarn 2.2 with about 400 executors. Hoping this will give > >> > some > >> > hints for improving the upcoming release, or for me to get some hints > to > >> > fix > >> > the problems. > >> > > >> > Thanks, > >> > Anders > >> > > >> > Error(1) > >> > > >> > 15/08/31 18:30:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID > >> > 3387, > >> > lon4-hadoopslave-c245.lon4.spotify.net): java.io.EOFException > >> > > >> >at java.io.DataInputStream.readInt(DataInputStream.java:392) > >> > > >> >at > >> > > >> > > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:121) > >> > > >> >at > >> > > >> > > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:109) > >> > > >> >at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > >> > > >> >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > >> > > >> >at > >> > > >> > > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > >> > > >> >at > >> > > >> > > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > >> > > >> >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > >> > > >> >at > >> > > >> > > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:366) > >> > > >> >at > >> > > >> > > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > >> > > >> >at > >> > > >> > > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$ > 1.org$apache$spark$sql$execution$aggregate$Tung > >> > >
Re: Problems with Tungsten in Spark 1.5.0-rc2
A fix submitted less than one hour after my mail, very impressive Davies! I've compiled your PR and tested it with the large job that failed before, and it seems to work fine now without any exceptions. Awesome, thanks! Best, Anders On Tue, Sep 1, 2015 at 1:38 AM Davies Liu <dav...@databricks.com> wrote: > I had sent out a PR [1] to fix 2), could you help to test that? > > [1] https://github.com/apache/spark/pull/8543 > > On Mon, Aug 31, 2015 at 12:34 PM, Anders Arpteg <arp...@spotify.com> > wrote: > > Was trying out 1.5 rc2 and noticed some issues with the Tungsten shuffle > > manager. One problem was when using the com.databricks.spark.avro reader > and > > the error(1) was received, see stack trace below. The problem does not > occur > > with the "sort" shuffle manager. > > > > Another problem was in a large complex job with lots of transformations > > occurring simultaneously, i.e. 50+ or more maps each shuffling data. > > Received error(2) about inability to acquire memory which seems to also > have > > to do with Tungsten. Possibly some setting available to increase that > > memory, because there's lots of heap memory available. > > > > Am running on Yarn 2.2 with about 400 executors. Hoping this will give > some > > hints for improving the upcoming release, or for me to get some hints to > fix > > the problems. > > > > Thanks, > > Anders > > > > Error(1) > > > > 15/08/31 18:30:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID > 3387, > > lon4-hadoopslave-c245.lon4.spotify.net): java.io.EOFException > > > >at java.io.DataInputStream.readInt(DataInputStream.java:392) > > > >at > > > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:121) > > > >at > > > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:109) > > > >at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > > > >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > > >at > > > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > > > >at > > > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > > > >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > > >at > > > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:366) > > > >at > > > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > > > >at > > > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$ > 1.org$apache$spark$sql$execution$aggregate$Tung > > > > stenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > > > >at > > > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > > > >at > > > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > > > >at > > > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:47) > > > >at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > > > >at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > > > >at > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > > >at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > > > >at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > > > >at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > > > >at org.apache.spark.scheduler.Task.run(Task.scala:88) > > > >at > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > > > >at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > >at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > >at java.lang.Thread.run(Thread.java:745) > > > > > > Error(2) > > > > 5/08/31 18:41:25 WARN TaskSetManager: Lost task 16.1 in stage 316.0 (TID > > 32686, lon4-hadoopslave-b925.lon4.spotify.net): jav
Problems with Tungsten in Spark 1.5.0-rc2
Was trying out 1.5 rc2 and noticed some issues with the Tungsten shuffle manager. One problem was when using the com.databricks.spark.avro reader and the error(1) was received, see stack trace below. The problem does not occur with the "sort" shuffle manager. Another problem was in a large complex job with lots of transformations occurring simultaneously, i.e. 50+ or more maps each shuffling data. Received error(2) about inability to acquire memory which seems to also have to do with Tungsten. Possibly some setting available to increase that memory, because there's lots of heap memory available. Am running on Yarn 2.2 with about 400 executors. Hoping this will give some hints for improving the upcoming release, or for me to get some hints to fix the problems. Thanks, Anders *Error(1) * 15/08/31 18:30:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 3387, lon4-hadoopslave-c245.lon4.spotify.net): java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:121) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:109) at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:366) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$ 1.org$apache$spark$sql$execution$aggregate$Tung stenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:47) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Error(2) * 5/08/31 18:41:25 WARN TaskSetManager: Lost task 16.1 in stage 316.0 (TID 32686, lon4-hadoopslave-b925.lon4.spotify.net): java.io.IOException: Unable to acquire 67108864 bytes of memory at org.apache.spark.shuffle.unsafe.UnsafeShuffleExternalSorter.acquireNewPageIfNecessary(UnsafeShuffleExternalSorter.java:385) at org.apache.spark.shuffle.unsafe.UnsafeShuffleExternalSorter.insertRecord(UnsafeShuffleExternalSorter.java:435) at org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:246) at org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:174) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Re: Parquet problems
No, never really resolved the problem, except by increasing the permgem space which only partially solved it. Still have to restart the job multiple times so make the whole job complete (it stores intermediate results). The parquet data sources have about 70 columns, and yes Cheng, it works fine when only loading a small sample of the data. Thankful for any hints, Anders On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote: How many columns are there in these Parquet files? Could you load a small portion of the original large dataset successfully? Cheng On 6/25/15 5:52 PM, Anders Arpteg wrote: Yes, both the driver and the executors. Works a little bit better with more space, but still a leak that will cause failure after a number of reads. There are about 700 different data sources that needs to be loaded, lots of data... tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev: Did you try increasing the perm gen for the driver? Regards Sab On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote: When reading large (and many) datasets with the Spark 1.4.0 DataFrames parquet reader (the org.apache.spark.sql.parquet format), the following exceptions are thrown: Exception in thread sk-result-getter-0 Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread task-result-getter-0 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-1 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: PermGen space and many more like these from different threads. I've tried increasing the PermGen space using the -XX:MaxPermSize VM setting, but even after tripling the space, the same errors occur. I've also tried storing intermediate results, and am able to get the full job completed by running it multiple times and starting for the last successful intermediate result. There seems to be some memory leak in the parquet format. Any hints on how to fix this problem? Thanks, Anders
Re: Parquet problems
Yes, both the driver and the executors. Works a little bit better with more space, but still a leak that will cause failure after a number of reads. There are about 700 different data sources that needs to be loaded, lots of data... tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.com skrev: Did you try increasing the perm gen for the driver? Regards Sab On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote: When reading large (and many) datasets with the Spark 1.4.0 DataFrames parquet reader (the org.apache.spark.sql.parquet format), the following exceptions are thrown: Exception in thread task-result-getter-0 Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread task-result-getter-0 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-1 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: PermGen space and many more like these from different threads. I've tried increasing the PermGen space using the -XX:MaxPermSize VM setting, but even after tripling the space, the same errors occur. I've also tried storing intermediate results, and am able to get the full job completed by running it multiple times and starting for the last successful intermediate result. There seems to be some memory leak in the parquet format. Any hints on how to fix this problem? Thanks, Anders
Parquet problems
When reading large (and many) datasets with the Spark 1.4.0 DataFrames parquet reader (the org.apache.spark.sql.parquet format), the following exceptions are thrown: Exception in thread task-result-getter-0 Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread task-result-getter-0 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-1 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: PermGen space and many more like these from different threads. I've tried increasing the PermGen space using the -XX:MaxPermSize VM setting, but even after tripling the space, the same errors occur. I've also tried storing intermediate results, and am able to get the full job completed by running it multiple times and starting for the last successful intermediate result. There seems to be some memory leak in the parquet format. Any hints on how to fix this problem? Thanks, Anders
Re: Spark 1.4.0-rc3: Actor not found
Tried on some other data sources as well, and it actually works for some parquet sources. Potentially some specific problems with that first parquet source that I tried with, and not a Spark 1.4 problem. I'll get back with more info if I find any new information. Thanks, Anders On Tue, Jun 2, 2015 at 8:45 PM, Yin Huai yh...@databricks.com wrote: Does it happen every time you read a parquet source? On Tue, Jun 2, 2015 at 3:42 AM, Anders Arpteg arp...@spotify.com wrote: The log is from the log aggregation tool (hortonworks, yarn logs ...), so both executors and driver. I'll send a private mail to you with the full logs. Also, tried another job as you suggested, and it actually worked fine. The first job was reading from a parquet source, and the second from an avro source. Could there be some issues with the parquet reader? Thanks, Anders On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote: How about other jobs? Is it an executor log, or a driver log? Could you post other logs near this error, please? Thank you. Best Regards, Shixiong Zhu 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com: Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster mode), initial stage starts, but the job fails before any task succeeds with the following error. Any hints? [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0] [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Spark 1.4.0-rc3: Actor not found
Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster mode), initial stage starts, but the job fails before any task succeeds with the following error. Any hints? [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0] [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Re: Spark 1.4.0-rc3: Actor not found
The log is from the log aggregation tool (hortonworks, yarn logs ...), so both executors and driver. I'll send a private mail to you with the full logs. Also, tried another job as you suggested, and it actually worked fine. The first job was reading from a parquet source, and the second from an avro source. Could there be some issues with the parquet reader? Thanks, Anders On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote: How about other jobs? Is it an executor log, or a driver log? Could you post other logs near this error, please? Thank you. Best Regards, Shixiong Zhu 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com: Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster mode), initial stage starts, but the job fails before any task succeeds with the following error. Any hints? [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0] [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Re: No executors allocated on yarn with latest master branch
We're using the capacity scheduler, to the best of my knowledge. Unsure if multi resource scheduling is used, but if you know of an easy way to figure that out, then let me know. Thanks, Anders On Sat, Feb 21, 2015 at 12:05 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you using the capacity scheduler or fifo scheduler without multi resource scheduling by any chance? On Thu, Feb 12, 2015 at 1:51 PM, Anders Arpteg arp...@spotify.com wrote: The nm logs only seems to contain similar to the following. Nothing else in the same time range. Any help? 2015-02-12 20:47:31,245 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_02 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_12 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_22 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_32 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_42 2015-02-12 21:24:30,515 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: FINISH_APPLICATION sent to absent application application_1422406067005_0053 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It seems unlikely to me that it would be a 2.2 issue, though not entirely impossible. Are you able to find any of the container logs? Is the NodeManager launching containers and reporting some exit code? -Sandy On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote: No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178
Re: Missing shuffle files
If you thinking of the yarn memory overhead, then yes, I have increased that as well. However, I'm glad to say that my job finished successfully finally. Besides the timeout and memory settings, performing repartitioning (with shuffling) at the right time seems to be the key to make this large job succeed. With all the transformations in the job, the partition distribution was becoming increasingly skewed. Not easy to figure out when and to what number of partitions to set, and takes forever to tweak these settings since it's works perfectly for small datasets and you'll have to experiment with large time-consuming jobs. Imagine if there was an automatic partition reconfiguration function that automagically did that... On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote: I *think* this may have been related to the default memory overhead setting being too low. I raised the value to 1G it and tried my job again but i had to leave the office before it finished. It did get further but I'm not exactly sure if that's just because i raised the memory. I'll see tomorrow- but i have a suspicion this may have been the cause of the executors being killed by the application master. On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote: I've got the opposite problem with regards to partitioning. I've got over 6000 partitions for some of these RDDs which immediately blows the heap somehow- I'm still not exactly sure how. If I coalesce them down to about 600-800 partitions, I get the problems where the executors are dying without any other error messages (other than telling me the executor was lost in the UI). If I don't coalesce, I pretty immediately get Java heap space exceptions that kill the job altogether. Putting in the timeouts didn't seem to help the case where I am coalescing. Also, I don't see any dfferences between 'disk only' and 'memory and disk' storage levels- both of them are having the same problems. I notice large shuffle files (30-40gb) that only seem to spill a few hundred mb. On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote: Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's
Re: Missing shuffle files
No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's currently rather high number of partitions. Thanks for the help so far! It's certainly a frustrating task to debug when everything's working perfectly on sample data locally and crashes hard when running on the full dataset on the cluster... On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com wrote: Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines will serve out the map spill data, instead of the Executor JVMs (by default unless you turn external shuffle on, the Executor JVM itself serves out the shuffle data which causes problems if an Executor dies). Core, how often are Executors crashing in your app? How many Executors do you have total? And what is the memory size for each? You can change what fraction of the Executor heap will be used for your user code vs the shuffle vs RDD caching with the spark.storage.memoryFraction setting. On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com wrote: Could you try to turn on the external shuffle service? spark.shuffle.service.enable = true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: Missing shuffle files
Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's currently rather high number of partitions. Thanks for the help so far! It's certainly a frustrating task to debug when everything's working perfectly on sample data locally and crashes hard when running on the full dataset on the cluster... On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com wrote: Do you guys have dynamic allocation turned on for YARN? Anders, was Task 450 in your job acting like a Reducer and fetching the Map spill output data from a different node? If a Reducer task can't read the remote data it needs, that could cause the stage to fail. Sometimes this forces the previous stage to also be re-computed if it's a wide dependency. But like Petar said, if you turn the external shuffle service on, YARN NodeManager process on the slave machines will serve out the map spill data, instead of the Executor JVMs (by default unless you turn external shuffle on, the Executor JVM itself serves out the shuffle data which causes problems if an Executor dies). Core, how often are Executors crashing in your app? How many Executors do you have total? And what is the memory size for each? You can change what fraction of the Executor heap will be used for your user code vs the shuffle vs RDD caching with the spark.storage.memoryFraction setting. On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com wrote: Could you try to turn on the external shuffle service? spark.shuffle.service.enable = true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason
Missing shuffle files
For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: No executors allocated on yarn with latest master branch
The nm logs only seems to contain similar to the following. Nothing else in the same time range. Any help? 2015-02-12 20:47:31,245 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_02 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_12 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_22 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_32 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_42 2015-02-12 21:24:30,515 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: FINISH_APPLICATION sent to absent application application_1422406067005_0053 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It seems unlikely to me that it would be a 2.2 issue, though not entirely impossible. Are you able to find any of the container logs? Is the NodeManager launching containers and reporting some exit code? -Sandy On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote: No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to successfully acquire executors. Any strange log messages or additional color you can provide on your setup? Does yarn-client mode work? -Sandy On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp
Re: No executors allocated on yarn with latest master branch
Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to successfully acquire executors. Any strange log messages or additional color you can provide on your setup? Does yarn-client mode work? -Sandy On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote: Hi, Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2 and failed executing jobs in yarn-cluster mode for that build. Works successfully with spark 1.2 (and also master from 2015-01-16), so something has changed since then that prevents the job from receiving any executors on the cluster. Basic symptoms are that the jobs fires up the AM, but after examining the executors page in the web ui, only the driver is listed, no executors are ever received, and the driver keep waiting forever. Has anyone seemed similar problems? Thanks for any insights, Anders
Re: No executors allocated on yarn with latest master branch
No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to successfully acquire executors. Any strange log messages or additional color you can provide on your setup? Does yarn-client mode work? -Sandy On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote: Hi, Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2 and failed executing jobs in yarn-cluster mode for that build. Works successfully with spark 1.2 (and also master from 2015-01-16), so something has changed since then that prevents the job from receiving any executors on the cluster. Basic symptoms are that the jobs fires up the AM, but after examining the executors page in the web ui, only the driver is listed, no executors are ever received, and the driver keep waiting forever. Has anyone seemed similar problems? Thanks for any insights, Anders
No executors allocated on yarn with latest master branch
Hi, Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2 and failed executing jobs in yarn-cluster mode for that build. Works successfully with spark 1.2 (and also master from 2015-01-16), so something has changed since then that prevents the job from receiving any executors on the cluster. Basic symptoms are that the jobs fires up the AM, but after examining the executors page in the web ui, only the driver is listed, no executors are ever received, and the driver keep waiting forever. Has anyone seemed similar problems? Thanks for any insights, Anders
Re: Failing jobs runs twice
FYI, I just confirmed with the latest Spark 1.3 snapshot that the spark.yarn.maxAppAttempts setting that SPARK-2165 refers to works perfectly. Great to finally get rid of this problem. Also caused an issue when the eventLogs were enabled since the spark-events/appXXX folder already exists the second time the app gets launched. On Thu, Jan 15, 2015 at 3:01 PM, Anders Arpteg arp...@spotify.com wrote: Found a setting that seems to fix this problem, but it does not seems to be available until Spark 1.3. See https://issues.apache.org/jira/browse/SPARK-2165 However, glad to see a work is being done with the issue. On Tue, Jan 13, 2015 at 8:00 PM, Anders Arpteg arp...@spotify.com wrote: Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to 1 (thanks Sean), but with no luck. Any ideas? On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote: Hi Anders, are you using YARN by any chance? 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com: Since starting using Spark 1.2, I've experienced an annoying issue with failing apps that gets executed twice. I'm not talking about tasks inside a job, that should be executed multiple times before failing the whole app. I'm talking about the whole app, that seems to close the previous Spark context, start a new, and rerun the app again. This is annoying since it overwrite the log files as well and it becomes hard to troubleshoot the failing app. Does anyone know how to turn this feature off? Thanks, Anders
Re: Failing jobs runs twice
Found a setting that seems to fix this problem, but it does not seems to be available until Spark 1.3. See https://issues.apache.org/jira/browse/SPARK-2165 However, glad to see a work is being done with the issue. On Tue, Jan 13, 2015 at 8:00 PM, Anders Arpteg arp...@spotify.com wrote: Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to 1 (thanks Sean), but with no luck. Any ideas? On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote: Hi Anders, are you using YARN by any chance? 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com: Since starting using Spark 1.2, I've experienced an annoying issue with failing apps that gets executed twice. I'm not talking about tasks inside a job, that should be executed multiple times before failing the whole app. I'm talking about the whole app, that seems to close the previous Spark context, start a new, and rerun the app again. This is annoying since it overwrite the log files as well and it becomes hard to troubleshoot the failing app. Does anyone know how to turn this feature off? Thanks, Anders
Re: Trouble with large Yarn job
Interesting, sounds plausible. Another way to avoid the problem has been to cache intermediate output for large jobs (i.e. split large jobs into smaller and then union together) Unfortunately that this type of tweaking should be necessary though, hopefully better in 1.2.1. On Tue, Jan 13, 2015 at 3:29 AM, Sven Krasser kras...@gmail.com wrote: Anders, This could be related to this open ticket: https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also fixed that for us as a stopgap. Best, -Sven On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg arp...@spotify.com wrote: Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've actually been able to solve the problem finally, and it seems to be an issue with too many partitions. The repartitioning I tried initially did so after the union, and then it's too late. By repartitioning as early as possible, and significantly reducing number of partitions (going from 100,000+ to ~6,000 partitions), the job succeeds and no more Error communicating with MapOutputTracker issues. Seems like an issue with handling too many partitions and executors as the same time. Would be awesome with an auto-repartition function, that checks sizes of existing partitions and compares with the HDFS block size. If too small (or too large), it would repartition to partition sizes similar to the block size... Hope this help others with similar issues. Best, Anders On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, Have you checked your NodeManager logs to make sure YARN isn't killing executors for exceeding memory limits? -Sandy On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote: Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDate in code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)). reduceByKey(_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders -- http://sites.google.com/site/krasser/?utm_source=sig
Re: Failing jobs runs twice
Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to 1 (thanks Sean), but with no luck. Any ideas? On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote: Hi Anders, are you using YARN by any chance? 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com: Since starting using Spark 1.2, I've experienced an annoying issue with failing apps that gets executed twice. I'm not talking about tasks inside a job, that should be executed multiple times before failing the whole app. I'm talking about the whole app, that seems to close the previous Spark context, start a new, and rerun the app again. This is annoying since it overwrite the log files as well and it becomes hard to troubleshoot the failing app. Does anyone know how to turn this feature off? Thanks, Anders
Failing jobs runs twice
Since starting using Spark 1.2, I've experienced an annoying issue with failing apps that gets executed twice. I'm not talking about tasks inside a job, that should be executed multiple times before failing the whole app. I'm talking about the whole app, that seems to close the previous Spark context, start a new, and rerun the app again. This is annoying since it overwrite the log files as well and it becomes hard to troubleshoot the failing app. Does anyone know how to turn this feature off? Thanks, Anders
Re: Trouble with large Yarn job
Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've actually been able to solve the problem finally, and it seems to be an issue with too many partitions. The repartitioning I tried initially did so after the union, and then it's too late. By repartitioning as early as possible, and significantly reducing number of partitions (going from 100,000+ to ~6,000 partitions), the job succeeds and no more Error communicating with MapOutputTracker issues. Seems like an issue with handling too many partitions and executors as the same time. Would be awesome with an auto-repartition function, that checks sizes of existing partitions and compares with the HDFS block size. If too small (or too large), it would repartition to partition sizes similar to the block size... Hope this help others with similar issues. Best, Anders On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, Have you checked your NodeManager logs to make sure YARN isn't killing executors for exceeding memory limits? -Sandy On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote: Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDate in code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey (_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders
Re: Queue independent jobs
Awesome, it actually seems to work. Amazing how simple it can be sometimes... Thanks Sean! On Fri, Jan 9, 2015 at 12:42 PM, Sean Owen so...@cloudera.com wrote: You can parallelize on the driver side. The way to do it is almost exactly what you have here, where you're iterating over a local Scala collection of dates and invoking a Spark operation for each. Simply write dateList.par.map(...) to make the local map proceed in parallel. It should invoke the Spark jobs simultaneously. On Fri, Jan 9, 2015 at 10:46 AM, Anders Arpteg arp...@spotify.com wrote: Hey, Lets say we have multiple independent jobs that each transform some data and store in distinct hdfs locations, is there a nice way to run them in parallel? See the following pseudo code snippet: dateList.map(date = sc.hdfsFile(date).map(transform).saveAsHadoopFile(date)) It's unfortunate if they run in sequence, since all the executors are not used efficiently. What's the best way to parallelize execution of these jobs? Thanks, Anders
Queue independent jobs
Hey, Lets say we have multiple independent jobs that each transform some data and store in distinct hdfs locations, is there a nice way to run them in parallel? See the following pseudo code snippet: dateList.map(date = sc.hdfsFile(date).map(transform).saveAsHadoopFile(date)) It's unfortunate if they run in sequence, since all the executors are not used efficiently. What's the best way to parallelize execution of these jobs? Thanks, Anders
Trouble with large Yarn job
Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDatein code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey(_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders
Trouble with large Yarn job
Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDate in code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey (_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders
Re: Dynamic Allocation in Spark 1.2.0
Thanks Tsuyoshi and Shixiong for the info. Awesome with more documentation about the feature! Was afraid that the node manager needed reconfiguration (and restart). Any idea of how much resources will the shuffle service take on the node manager? In a multi-tenant Hadoop cluster environment, it would be undesirable to have a Spark-specific long running service taking up resources from other types of jobs on the cluster. Thanks again, Anders On Sun, Dec 28, 2014 at 8:08 AM, Shixiong Zhu zsxw...@gmail.com wrote: I encountered the following issue when enabling dynamicAllocation. You may want to take a look at it. https://issues.apache.org/jira/browse/SPARK-4951 Best Regards, Shixiong Zhu 2014-12-28 2:07 GMT+08:00 Tsuyoshi OZAWA ozawa.tsuyo...@gmail.com: Hi Anders, I faced the same issue as you mentioned. Yes, you need to install spark shuffle plugin for YARN. Please check following PRs which add doc to enable dynamicAllocation: https://github.com/apache/spark/pull/3731 https://github.com/apache/spark/pull/3757 I could run Spark on YARN with dynamicAllocation by following the instructions described in the docs. Thanks, - Tsuyoshi On Sat, Dec 27, 2014 at 11:06 PM, Anders Arpteg arp...@spotify.com wrote: Hey, Tried to get the new spark.dynamicAllocation.enabled feature working on Yarn (Hadoop 2.2), but am unsuccessful so far. I've tested with the following settings: conf .set(spark.dynamicAllocation.enabled, true) .set(spark.shuffle.service.enabled, true) .set(spark.dynamicAllocation.minExecutors, 10) .set(spark.dynamicAllocation.maxExecutors, 700) The app works fine on Spark 1.2 if dynamicAllocation is not enabled, but with the settings above, it will start the app and the first job is listed in the web ui. However, no tasks are started and it seems to be stuck waiting for a container to be allocated forever. Any help would be appreciated. Need to do something specific to get the external yarn shuffle service running in the node manager? TIA, Anders -- - Tsuyoshi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Dynamic Allocation in Spark 1.2.0
Hey, Tried to get the new spark.dynamicAllocation.enabled feature working on Yarn (Hadoop 2.2), but am unsuccessful so far. I've tested with the following settings: conf .set(spark.dynamicAllocation.enabled, true) .set(spark.shuffle.service.enabled, true) .set(spark.dynamicAllocation.minExecutors, 10) .set(spark.dynamicAllocation.maxExecutors, 700) The app works fine on Spark 1.2 if dynamicAllocation is not enabled, but with the settings above, it will start the app and the first job is listed in the web ui. However, no tasks are started and it seems to be stuck waiting for a container to be allocated forever. Any help would be appreciated. Need to do something specific to get the external yarn shuffle service running in the node manager? TIA, Anders
Dynamic Allocation in Spark 1.2.0
Hey, Tried to get the new spark.dynamicAllocation.enabled feature working on Yarn (Hadoop 2.2), but am unsuccessful so far. I've tested with the following settings: conf .set(spark.dynamicAllocation.enabled, true) .set(spark.shuffle.service.enabled, true) .set(spark.dynamicAllocation.minExecutors, 10) .set(spark.dynamicAllocation.maxExecutors, 700) The app works fine on Spark 1.2 if dynamicAllocation is not enabled, but with the settings above, it will start the app and the first job is listed in the web ui. However, no tasks are started and it seems to be stuck waiting for a container to be allocated forever. Any help would be appreciated. Need to do something specific to get the external yarn shuffle service running in the node manager? TIA, Anders