Re: Large number of conf broadcasts

2015-12-18 Thread Anders Arpteg
Awesome, thanks for the PR Koert!

/Anders

On Thu, Dec 17, 2015 at 10:22 PM Prasad Ravilla <pras...@slalom.com> wrote:

> Thanks, Koert.
>
> Regards,
> Prasad.
>
> From: Koert Kuipers
> Date: Thursday, December 17, 2015 at 1:06 PM
> To: Prasad Ravilla
> Cc: Anders Arpteg, user
>
> Subject: Re: Large number of conf broadcasts
>
> https://github.com/databricks/spark-avro/pull/95
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_databricks_spark-2Davro_pull_95=CwMFaQ=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw=9AjxHvmieZttugnxWogbT7lOTg1hVM6cMVLj6tfukY4=mDYfa3wyqnL6HBitNnJzuriOYqY5e8l7cgMnUgjx96s=>
>
> On Thu, Dec 17, 2015 at 3:35 PM, Prasad Ravilla <pras...@slalom.com>
> wrote:
>
>> Hi Anders,
>>
>> I am running into the same issue as yours. I am trying to read about 120
>> thousand avro files into a single data frame.
>>
>> Is your patch part of a pull request from the master branch in github?
>>
>> Thanks,
>> Prasad.
>>
>> From: Anders Arpteg
>> Date: Thursday, October 22, 2015 at 10:37 AM
>> To: Koert Kuipers
>> Cc: user
>> Subject: Re: Large number of conf broadcasts
>>
>> Yes, seems unnecessary. I actually tried patching the
>> com.databricks.spark.avro reader to only broadcast once per dataset,
>> instead of every single file/partition. It seems to work just as fine, and
>> there are significantly less broadcasts and not seeing out of memory issues
>> any more. Strange that more people does not react to this, since the
>> broadcasting seems completely unnecessary...
>>
>> Best,
>> Anders
>>
>> On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> i am seeing the same thing. its gona completely crazy creating
>>> broadcasts for the last 15 mins or so. killing it...
>>>
>>> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg <arp...@spotify.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Running spark 1.5.0 in yarn-client mode, and am curios in why there are
>>>> so many broadcast being done when loading datasets with large number of
>>>> partitions/files. Have datasets with thousands of partitions, i.e. hdfs
>>>> files in the avro folder, and sometime loading hundreds of these large
>>>> datasets. Believe I have located the broadcast to line
>>>> SparkContext.scala:1006. It seems to just broadcast the hadoop
>>>> configuration, and I don't see why it should be necessary to broadcast that
>>>> for EVERY file? Wouldn't it be possible to reuse the same broadcast
>>>> configuration? It hardly the case the the configuration would be different
>>>> between each file in a single dataset. Seems to be wasting lots of memory
>>>> and needs to persist unnecessarily to disk (see below again).
>>>>
>>>> Thanks,
>>>> Anders
>>>>
>>>> 15/09/24 17:11:11 INFO BlockManager: Writing block
>>>> broadcast_1871_piece0 to disk
>>>>  [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added
>>>> broadcast_1871_piece0 on disk on 10.254.35.24:49428
>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__10.254.35.24-3A49428=AAMFaQ=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw=l2yANY7xVKKwiFwzeDzKhyU0PGja-46MWiTFMCmhYH8=JWqID_Bk5XTujNC34_AAgssnJp-X3ocZ79BgAwGOLbQ=>
>>>> (size: 23.1 KB)
>>>> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored
>>>> as bytes in memory (estimated size 23.1 KB, free 2.4 KB)
>>>> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
>>>> memory on 10.254.35.24:49428
>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__10.254.35.24-3A49428=AAMFaQ=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw=l2yANY7xVKKwiFwzeDzKhyU0PGja-46MWiTFMCmhYH8=JWqID_Bk5XTujNC34_AAgssnJp-X3ocZ79BgAwGOLbQ=>
>>>> (size: 23.1 KB, free: 464.0 MB)
>>>> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
>>>> hadoopFile at AvroRelation.scala:121
>>>> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
>>>> threshold of 1024.0 KB for computing block broadcast_4804 in memory
>>>> .
>>>> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
>>>> broadcast_4804 in memory! (computed 496.0 B so far)
>>>> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) +
>>>> 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
>>>> limit = 530.3 MB.
>>>> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
>>>> disk instead.
>>>> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
>>>> curMem=556036460, maxMem=556038881
>>>> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
>>>> 15/09/24 17:11:11 INFO BlockManager: Dropping block
>>>> broadcast_1872_piece0 from memory
>>>> 15/09/24 17:11:11 INFO BlockManager: Writing block
>>>> broadcast_1872_piece0 to disk
>>>>
>>>>
>>>
>>>
>


Re: Large number of conf broadcasts

2015-10-26 Thread Anders Arpteg
Nice Koert, lets hope it gets merged soon.

/Anders

On Fri, Oct 23, 2015 at 6:32 PM Koert Kuipers <ko...@tresata.com> wrote:

> https://github.com/databricks/spark-avro/pull/95
>
> On Fri, Oct 23, 2015 at 5:01 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> oh no wonder... it undoes the glob (i was reading from /some/path/*),
>> creates a hadoopRdd for every path, and then creates a union of them using
>> UnionRDD.
>>
>> thats not what i want... no need to do union. AvroInpuFormat already has
>> the ability to handle globs (or multiple paths comma separated) very
>> efficiently. AvroRelation should just pass the paths (comma separated).
>>
>>
>>
>>
>> On Thu, Oct 22, 2015 at 1:37 PM, Anders Arpteg <arp...@spotify.com>
>> wrote:
>>
>>> Yes, seems unnecessary. I actually tried patching the
>>> com.databricks.spark.avro reader to only broadcast once per dataset,
>>> instead of every single file/partition. It seems to work just as fine, and
>>> there are significantly less broadcasts and not seeing out of memory issues
>>> any more. Strange that more people does not react to this, since the
>>> broadcasting seems completely unnecessary...
>>>
>>> Best,
>>> Anders
>>>
>>>
>>> On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> i am seeing the same thing. its gona completely crazy creating
>>>> broadcasts for the last 15 mins or so. killing it...
>>>>
>>>> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg <arp...@spotify.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Running spark 1.5.0 in yarn-client mode, and am curios in why there
>>>>> are so many broadcast being done when loading datasets with large number 
>>>>> of
>>>>> partitions/files. Have datasets with thousands of partitions, i.e. hdfs
>>>>> files in the avro folder, and sometime loading hundreds of these large
>>>>> datasets. Believe I have located the broadcast to line
>>>>> SparkContext.scala:1006. It seems to just broadcast the hadoop
>>>>> configuration, and I don't see why it should be necessary to broadcast 
>>>>> that
>>>>> for EVERY file? Wouldn't it be possible to reuse the same broadcast
>>>>> configuration? It hardly the case the the configuration would be different
>>>>> between each file in a single dataset. Seems to be wasting lots of memory
>>>>> and needs to persist unnecessarily to disk (see below again).
>>>>>
>>>>> Thanks,
>>>>> Anders
>>>>>
>>>>> 15/09/24 17:11:11 INFO BlockManager: Writing block
>>>>> broadcast_1871_piece0 to disk
>>>>>  [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added
>>>>> broadcast_1871_piece0 on disk on 10.254.35.24:49428 (size: 23.1 KB)
>>>>> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored
>>>>> as bytes in memory (estimated size 23.1 KB, free 2.4 KB)
>>>>> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0
>>>>> in memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
>>>>> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803
>>>>> from hadoopFile at AvroRelation.scala:121
>>>>> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
>>>>> threshold of 1024.0 KB for computing block broadcast_4804 in memory
>>>>> .
>>>>> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
>>>>> broadcast_4804 in memory! (computed 496.0 B so far)
>>>>> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) +
>>>>> 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
>>>>> limit = 530.3 MB.
>>>>> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
>>>>> disk instead.
>>>>> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
>>>>> curMem=556036460, maxMem=556038881
>>>>> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
>>>>> 15/09/24 17:11:11 INFO BlockManager: Dropping block
>>>>> broadcast_1872_piece0 from memory
>>>>> 15/09/24 17:11:11 INFO BlockManager: Writing block
>>>>> broadcast_1872_piece0 to disk
>>>>>
>>>>>
>>>>
>>>>
>>
>


Re: Large number of conf broadcasts

2015-10-22 Thread Anders Arpteg
Yes, seems unnecessary. I actually tried patching the
com.databricks.spark.avro reader to only broadcast once per dataset,
instead of every single file/partition. It seems to work just as fine, and
there are significantly less broadcasts and not seeing out of memory issues
any more. Strange that more people does not react to this, since the
broadcasting seems completely unnecessary...

Best,
Anders

On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers <ko...@tresata.com> wrote:

> i am seeing the same thing. its gona completely crazy creating broadcasts
> for the last 15 mins or so. killing it...
>
> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg <arp...@spotify.com> wrote:
>
>> Hi,
>>
>> Running spark 1.5.0 in yarn-client mode, and am curios in why there are
>> so many broadcast being done when loading datasets with large number of
>> partitions/files. Have datasets with thousands of partitions, i.e. hdfs
>> files in the avro folder, and sometime loading hundreds of these large
>> datasets. Believe I have located the broadcast to line
>> SparkContext.scala:1006. It seems to just broadcast the hadoop
>> configuration, and I don't see why it should be necessary to broadcast that
>> for EVERY file? Wouldn't it be possible to reuse the same broadcast
>> configuration? It hardly the case the the configuration would be different
>> between each file in a single dataset. Seems to be wasting lots of memory
>> and needs to persist unnecessarily to disk (see below again).
>>
>> Thanks,
>> Anders
>>
>> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0
>> to disk  [19/49086]15/09/24
>> 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on
>> 10.254.35.24:49428 (size: 23.1 KB)
>> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as
>> bytes in memory (estimated size 23.1 KB, free 2.4 KB)
>> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
>> memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
>> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
>> hadoopFile at AvroRelation.scala:121
>> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
>> threshold of 1024.0 KB for computing block broadcast_4804 in memory
>> .
>> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
>> broadcast_4804 in memory! (computed 496.0 B so far)
>> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0
>> B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
>> limit = 530.3 MB.
>> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
>> disk instead.
>> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
>> curMem=556036460, maxMem=556038881
>> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
>> 15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0
>> from memory
>> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0
>> to disk
>>
>>
>
>


Problem with multiple fields with same name in Avro

2015-09-26 Thread Anders Arpteg
Hi,

Received the following error when reading an Avro source with Spark 1.5.0
and the com.databricks.spark.avro reader. In the data source, there is one
nested field named "UserActivity.history.activity" and another named
"UserActivity.activity". This seems to be the reason for the execption,
since the two fields are named the same but in different levels in the
hierarchy.

Any ideas of how to get around this? Execption occurs directly when trying
to load the data.

Thanks,
Anders

15/09/26 11:42:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
lon4-hadoopslave-a148.lon4.spotify.net):
org.apache.avro.AvroRuntimeException: Bad in
dex
at
com.spotify.analytics.schema.UserActivity.put(UserActivity.java:60)
at
org.apache.avro.generic.GenericData.setField(GenericData.java:573)
at
org.apache.avro.generic.GenericData.setField(GenericData.java:590)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:66)
at
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157)
at
com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:127)
at
com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:126)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Large number of conf broadcasts

2015-09-24 Thread Anders Arpteg
Hi,

Running spark 1.5.0 in yarn-client mode, and am curios in why there are so
many broadcast being done when loading datasets with large number of
partitions/files. Have datasets with thousands of partitions, i.e. hdfs
files in the avro folder, and sometime loading hundreds of these large
datasets. Believe I have located the broadcast to line
SparkContext.scala:1006. It seems to just broadcast the hadoop
configuration, and I don't see why it should be necessary to broadcast that
for EVERY file? Wouldn't it be possible to reuse the same broadcast
configuration? It hardly the case the the configuration would be different
between each file in a single dataset. Seems to be wasting lots of memory
and needs to persist unnecessarily to disk (see below again).

Thanks,
Anders

15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0 to
disk  [19/49086]15/09/24
17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on
10.254.35.24:49428 (size: 23.1 KB)
15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as
bytes in memory (estimated size 23.1 KB, free 2.4 KB)
15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
hadoopFile at AvroRelation.scala:121
15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block broadcast_4804 in memory
.
15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
broadcast_4804 in memory! (computed 496.0 B so far)
15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 B
(scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
limit = 530.3 MB.
15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to disk
instead.
15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
curMem=556036460, maxMem=556038881
15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0
from memory
15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0 to
disk


Re: Problems with Tungsten in Spark 1.5.0-rc2

2015-09-08 Thread Anders Arpteg
Ok, thanks Reynold. When I tested dynamic allocation with Spark 1.4, it
complained saying that it was not tungsten compliant. Lets hope it works
with 1.5 then!

On Tue, Sep 8, 2015 at 5:49 AM Reynold Xin <r...@databricks.com> wrote:

>
> On Wed, Sep 2, 2015 at 12:03 AM, Anders Arpteg <arp...@spotify.com> wrote:
>
>>
>> BTW, is it possible (or will it be) to use Tungsten with dynamic
>> allocation and the external shuffle manager?
>>
>>
> Yes - I think this already works. There isn't anything specific here
> related to Tungsten.
>
>


Re: Problems with Tungsten in Spark 1.5.0-rc2

2015-09-02 Thread Anders Arpteg
I haven't done a comparative benchmarking between the two, and it would
involve some work to do so. A single run with each suffler would probably
not say that much since we have a rather busy cluster and the performance
heavily depends on what's currently running in the cluster. I have seen
less problems/exceptions though, and possibilities to decrease memory
requirements (or increase cores), which is of great help.

BTW, is it possible (or will it be) to use Tungsten with dynamic allocation
and the external shuffle manager?

Best,
Anders

On Tue, Sep 1, 2015 at 7:07 PM Davies Liu <dav...@databricks.com> wrote:

> Thanks for the confirmation. The tungsten-sort is not the default
> ShuffleManager, this fix will not block 1.5 release, it may be in
> 1.5.1.
>
> BTW, How is the difference between sort and tungsten-sort
> ShuffleManager for this large job?
>
> On Tue, Sep 1, 2015 at 8:03 AM, Anders Arpteg <arp...@spotify.com> wrote:
> > A fix submitted less than one hour after my mail, very impressive Davies!
> > I've compiled your PR and tested it with the large job that failed
> before,
> > and it seems to work fine now without any exceptions. Awesome, thanks!
> >
> > Best,
> > Anders
> >
> > On Tue, Sep 1, 2015 at 1:38 AM Davies Liu <dav...@databricks.com> wrote:
> >>
> >> I had sent out a PR [1] to fix 2), could you help to test that?
> >>
> >> [1]  https://github.com/apache/spark/pull/8543
> >>
> >> On Mon, Aug 31, 2015 at 12:34 PM, Anders Arpteg <arp...@spotify.com>
> >> wrote:
> >> > Was trying out 1.5 rc2 and noticed some issues with the Tungsten
> shuffle
> >> > manager. One problem was when using the com.databricks.spark.avro
> reader
> >> > and
> >> > the error(1) was received, see stack trace below. The problem does not
> >> > occur
> >> > with the "sort" shuffle manager.
> >> >
> >> > Another problem was in a large complex job with lots of
> transformations
> >> > occurring simultaneously, i.e. 50+ or more maps each shuffling data.
> >> > Received error(2) about inability to acquire memory which seems to
> also
> >> > have
> >> > to do with Tungsten. Possibly some setting available to increase that
> >> > memory, because there's lots of heap memory available.
> >> >
> >> > Am running on Yarn 2.2 with about 400 executors. Hoping this will give
> >> > some
> >> > hints for improving the upcoming release, or for me to get some hints
> to
> >> > fix
> >> > the problems.
> >> >
> >> > Thanks,
> >> > Anders
> >> >
> >> > Error(1)
> >> >
> >> > 15/08/31 18:30:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
> >> > 3387,
> >> > lon4-hadoopslave-c245.lon4.spotify.net): java.io.EOFException
> >> >
> >> >at java.io.DataInputStream.readInt(DataInputStream.java:392)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:121)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:109)
> >> >
> >> >at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> >> >
> >> >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> >> >
> >> >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:366)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org$apache$spark$sql$execution$aggregate$Tung
> >> >
> 

Re: Problems with Tungsten in Spark 1.5.0-rc2

2015-09-01 Thread Anders Arpteg
A fix submitted less than one hour after my mail, very impressive Davies!
I've compiled your PR and tested it with the large job that failed before,
and it seems to work fine now without any exceptions. Awesome, thanks!

Best,
Anders

On Tue, Sep 1, 2015 at 1:38 AM Davies Liu <dav...@databricks.com> wrote:

> I had sent out a PR [1] to fix 2), could you help to test that?
>
> [1]  https://github.com/apache/spark/pull/8543
>
> On Mon, Aug 31, 2015 at 12:34 PM, Anders Arpteg <arp...@spotify.com>
> wrote:
> > Was trying out 1.5 rc2 and noticed some issues with the Tungsten shuffle
> > manager. One problem was when using the com.databricks.spark.avro reader
> and
> > the error(1) was received, see stack trace below. The problem does not
> occur
> > with the "sort" shuffle manager.
> >
> > Another problem was in a large complex job with lots of transformations
> > occurring simultaneously, i.e. 50+ or more maps each shuffling data.
> > Received error(2) about inability to acquire memory which seems to also
> have
> > to do with Tungsten. Possibly some setting available to increase that
> > memory, because there's lots of heap memory available.
> >
> > Am running on Yarn 2.2 with about 400 executors. Hoping this will give
> some
> > hints for improving the upcoming release, or for me to get some hints to
> fix
> > the problems.
> >
> > Thanks,
> > Anders
> >
> > Error(1)
> >
> > 15/08/31 18:30:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
> 3387,
> > lon4-hadoopslave-c245.lon4.spotify.net): java.io.EOFException
> >
> >at java.io.DataInputStream.readInt(DataInputStream.java:392)
> >
> >at
> >
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:121)
> >
> >at
> >
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:109)
> >
> >at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> >
> >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >
> >at
> >
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
> >
> >at
> >
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> >
> >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >
> >at
> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:366)
> >
> >at
> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> >
> >at
> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org$apache$spark$sql$execution$aggregate$Tung
> >
> > stenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> >
> >at
> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> >
> >at
> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> >
> >at
> >
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:47)
> >
> >at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> >
> >at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> >
> >at
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> >
> >at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> >
> >at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> >
> >at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> >
> >at org.apache.spark.scheduler.Task.run(Task.scala:88)
> >
> >at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >
> >at java.lang.Thread.run(Thread.java:745)
> >
> >
> > Error(2)
> >
> > 5/08/31 18:41:25 WARN TaskSetManager: Lost task 16.1 in stage 316.0 (TID
> > 32686, lon4-hadoopslave-b925.lon4.spotify.net): jav

Problems with Tungsten in Spark 1.5.0-rc2

2015-08-31 Thread Anders Arpteg
Was trying out 1.5 rc2 and noticed some issues with the Tungsten shuffle
manager. One problem was when using the com.databricks.spark.avro reader
and the error(1) was received, see stack trace below. The problem does not
occur with the "sort" shuffle manager.

Another problem was in a large complex job with lots of transformations
occurring simultaneously, i.e. 50+ or more maps each shuffling data.
Received error(2) about inability to acquire memory which seems to also
have to do with Tungsten. Possibly some setting available to increase that
memory, because there's lots of heap memory available.

Am running on Yarn 2.2 with about 400 executors. Hoping this will give some
hints for improving the upcoming release, or for me to get some hints to
fix the problems.

Thanks,
Anders

*Error(1) *

15/08/31 18:30:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
3387, lon4-hadoopslave-c245.lon4.spotify.net): java.io.EOFException

   at java.io.DataInputStream.readInt(DataInputStream.java:392)

   at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:121)

   at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:109)

   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)

   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

   at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)

   at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)

   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

   at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:366)

   at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)

   at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
1.org$apache$spark$sql$execution$aggregate$Tung

stenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)

   at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)

   at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)

   at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:47)

   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

   at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

   at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

   at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

   at org.apache.spark.scheduler.Task.run(Task.scala:88)

   at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

   at java.lang.Thread.run(Thread.java:745)

*Error(2) *

5/08/31 18:41:25 WARN TaskSetManager: Lost task 16.1 in stage 316.0 (TID
32686, lon4-hadoopslave-b925.lon4.spotify.net): java.io.IOException: Unable
to acquire 67108864 bytes of memory

   at
org.apache.spark.shuffle.unsafe.UnsafeShuffleExternalSorter.acquireNewPageIfNecessary(UnsafeShuffleExternalSorter.java:385)

   at
org.apache.spark.shuffle.unsafe.UnsafeShuffleExternalSorter.insertRecord(UnsafeShuffleExternalSorter.java:435)

   at
org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:246)

   at
org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:174)

   at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

   at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

   at org.apache.spark.scheduler.Task.run(Task.scala:88)

   at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)


Re: Parquet problems

2015-07-22 Thread Anders Arpteg
No, never really resolved the problem, except by increasing the permgem
space which only partially solved it. Still have to restart the job
multiple times so make the whole job complete (it stores intermediate
results).

The parquet data sources have about 70 columns, and yes Cheng, it works
fine when only loading a small sample of the data.

Thankful for any hints,
Anders

On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote:

  How many columns are there in these Parquet files? Could you load a small
 portion of the original large dataset successfully?

 Cheng


 On 6/25/15 5:52 PM, Anders Arpteg wrote:

 Yes, both the driver and the executors. Works a little bit better with
 more space, but still a leak that will cause failure after a number of
 reads. There are about 700 different data sources that needs to be loaded,
 lots of data...

  tor 25 jun 2015 08:02 Sabarish Sasidharan 
 sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev:

 Did you try increasing the perm gen for the driver?

 Regards
 Sab

 On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote:

 When reading large (and many) datasets with the Spark 1.4.0 DataFrames
 parquet reader (the org.apache.spark.sql.parquet format), the following
 exceptions are thrown:

  Exception in thread sk-result-getter-0

 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread task-result-getter-0
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-1 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-2 java.lang.OutOfMemoryError:
 PermGen space


  and many more like these from different threads. I've tried increasing
 the PermGen space using the -XX:MaxPermSize VM setting, but even after
 tripling the space, the same errors occur. I've also tried storing
 intermediate results, and am able to get the full job completed by running
 it multiple times and starting for the last successful intermediate result.
 There seems to be some memory leak in the parquet format. Any hints on how
 to fix this problem?

  Thanks,
 Anders




Re: Parquet problems

2015-06-25 Thread Anders Arpteg
Yes, both the driver and the executors. Works a little bit better with more
space, but still a leak that will cause failure after a number of reads.
There are about 700 different data sources that needs to be loaded, lots of
data...

tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.com
skrev:

 Did you try increasing the perm gen for the driver?

 Regards
 Sab
 On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote:

 When reading large (and many) datasets with the Spark 1.4.0 DataFrames
 parquet reader (the org.apache.spark.sql.parquet format), the following
 exceptions are thrown:

 Exception in thread task-result-getter-0
 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread task-result-getter-0
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-1 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-2 java.lang.OutOfMemoryError:
 PermGen space

 and many more like these from different threads. I've tried increasing
 the PermGen space using the -XX:MaxPermSize VM setting, but even after
 tripling the space, the same errors occur. I've also tried storing
 intermediate results, and am able to get the full job completed by running
 it multiple times and starting for the last successful intermediate result.
 There seems to be some memory leak in the parquet format. Any hints on how
 to fix this problem?

 Thanks,
 Anders




Parquet problems

2015-06-24 Thread Anders Arpteg
When reading large (and many) datasets with the Spark 1.4.0 DataFrames
parquet reader (the org.apache.spark.sql.parquet format), the following
exceptions are thrown:

Exception in thread task-result-getter-0
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread task-result-getter-0
Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
PermGen space
Exception in thread task-result-getter-1 java.lang.OutOfMemoryError:
PermGen space
Exception in thread task-result-getter-2 java.lang.OutOfMemoryError:
PermGen space

and many more like these from different threads. I've tried increasing the
PermGen space using the -XX:MaxPermSize VM setting, but even after tripling
the space, the same errors occur. I've also tried storing intermediate
results, and am able to get the full job completed by running it multiple
times and starting for the last successful intermediate result. There seems
to be some memory leak in the parquet format. Any hints on how to fix this
problem?

Thanks,
Anders


Re: Spark 1.4.0-rc3: Actor not found

2015-06-03 Thread Anders Arpteg
Tried on some other data sources as well, and it actually works for some
parquet sources. Potentially some specific problems with that first parquet
source that I tried with, and not a Spark 1.4 problem. I'll get back with
more info if I find any new information.

Thanks,
Anders

On Tue, Jun 2, 2015 at 8:45 PM, Yin Huai yh...@databricks.com wrote:

 Does it happen every time you read a parquet source?

 On Tue, Jun 2, 2015 at 3:42 AM, Anders Arpteg arp...@spotify.com wrote:

 The log is from the log aggregation tool (hortonworks, yarn logs ...),
 so both executors and driver. I'll send a private mail to you with the full
 logs. Also, tried another job as you suggested, and it actually worked
 fine. The first job was reading from a parquet source, and the second from
 an avro source. Could there be some issues with the parquet reader?

 Thanks,
 Anders

 On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 How about other jobs? Is it an executor log, or a driver log? Could you
 post other logs near this error, please? Thank you.

 Best Regards,
 Shixiong Zhu

 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com:

 Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
 worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
 mode), initial stage starts, but the job fails before any task succeeds
 with the following error. Any hints?

 [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
 [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
 swallowing exception during message send
 (akka.remote.RemoteTransportExceptionNoStackTrace)
 Exception in thread main akka.actor.ActorNotFound: Actor not found
 for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
 Path(/user/OutputCommitCoordinator)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 at
 akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 at
 akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 at
 akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)







Spark 1.4.0-rc3: Actor not found

2015-06-02 Thread Anders Arpteg
Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
mode), initial stage starts, but the job fails before any task succeeds
with the following error. Any hints?

[ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
Exception in thread main akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
Path(/user/OutputCommitCoordinator)]
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
at
akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)


Re: Spark 1.4.0-rc3: Actor not found

2015-06-02 Thread Anders Arpteg
The log is from the log aggregation tool (hortonworks, yarn logs ...), so
both executors and driver. I'll send a private mail to you with the full
logs. Also, tried another job as you suggested, and it actually worked
fine. The first job was reading from a parquet source, and the second from
an avro source. Could there be some issues with the parquet reader?

Thanks,
Anders

On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 How about other jobs? Is it an executor log, or a driver log? Could you
 post other logs near this error, please? Thank you.

 Best Regards,
 Shixiong Zhu

 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com:

 Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
 worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
 mode), initial stage starts, but the job fails before any task succeeds
 with the following error. Any hints?

 [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
 [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
 swallowing exception during message send
 (akka.remote.RemoteTransportExceptionNoStackTrace)
 Exception in thread main akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
 Path(/user/OutputCommitCoordinator)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 at
 akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)





Re: No executors allocated on yarn with latest master branch

2015-02-25 Thread Anders Arpteg
We're using the capacity scheduler, to the best of my knowledge. Unsure if
multi resource scheduling is used, but if you know of an easy way to figure
that out, then let me know.

Thanks,
Anders

On Sat, Feb 21, 2015 at 12:05 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Are you using the capacity scheduler or fifo scheduler without multi
 resource scheduling by any chance?

 On Thu, Feb 12, 2015 at 1:51 PM, Anders Arpteg arp...@spotify.com wrote:

 The nm logs only seems to contain similar to the following. Nothing else
 in the same time range. Any help?

 2015-02-12 20:47:31,245 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_02
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_12
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_22
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_32
 2015-02-12 20:47:31,246 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: KILL_CONTAINER sent to absent container
 container_1422406067005_0053_01_42
 2015-02-12 21:24:30,515 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 Event EventType: FINISH_APPLICATION sent to absent application
 application_1422406067005_0053

 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 It seems unlikely to me that it would be a 2.2 issue, though not
 entirely impossible.  Are you able to find any of the container logs?  Is
 the NodeManager launching containers and reporting some exit code?

 -Sandy

 On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com
 wrote:

 No, not submitting from windows, from a debian distribution. Had a
 quick look at the rm logs, and it seems some containers are allocated but
 then released again for some reason. Not easy to make sense of the logs,
 but here is a snippet from the logs (from a test in our small test cluster)
 if you'd like to have a closer look: http://pastebin.com/8WU9ivqC

 Sandy, sounds like it could possible be a 2.2 issue then, or what do
 you think?

 Thanks,
 Anders

 On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of
 YARN to see if you can trace the error. In the past I have to closely look
 at arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com
 wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at
 org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at
 com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178

Re: Missing shuffle files

2015-02-24 Thread Anders Arpteg
If you thinking of the yarn memory overhead, then yes, I have increased
that as well. However, I'm glad to say that my job finished successfully
finally. Besides the timeout and memory settings, performing repartitioning
(with shuffling) at the right time seems to be the key to make this large
job succeed. With all the transformations in the job, the partition
distribution was becoming increasingly skewed. Not easy to figure out when
and to what number of partitions to set, and takes forever to tweak these
settings since it's works perfectly for small datasets and you'll have to
experiment with large time-consuming jobs. Imagine if there was an
automatic partition reconfiguration function that automagically did that...

On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote:

 I *think* this may have been related to the default memory overhead
 setting being too low. I raised the value to 1G it and tried my job again
 but i had to leave the office before it finished. It did get further but
 I'm not exactly sure if that's just because i raised the memory. I'll see
 tomorrow- but i have a suspicion this may have been the cause of the
 executors being killed by the application master.
 On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got over
 6000 partitions for some of these RDDs which immediately blows the heap
 somehow- I'm still not exactly sure how. If I coalesce them down to about
 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do 
 some
 more advanced partition tuning to make this job work, as it's

Re: Missing shuffle files

2015-02-23 Thread Anders Arpteg
No, unfortunately we're not making use of dynamic allocation or the
external shuffle service. Hoping that we could reconfigure our cluster to
make use of it, but since it requires changes to the cluster itself (and
not just the Spark app), it could take some time.

Unsure if task 450 was acting as a reducer or not, but seems possible.
Probably due to a crashed executor as you say. Seems like I need to do some
more advanced partition tuning to make this job work, as it's currently
rather high number of partitions.

Thanks for the help so far! It's certainly a frustrating task to debug when
everything's working perfectly on sample data locally and crashes hard when
running on the full dataset on the cluster...

On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors do
 you have total? And what is the memory size for each? You can change what
 fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing that
 executors are being lost as well. Thing is, I can't figure out how they are
 dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
 allocated for the application. I was thinking perhaps it was possible that
 a single executor was getting a single or a couple large partitions but
 shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

  TIA,
 Anders







Re: Missing shuffle files

2015-02-23 Thread Anders Arpteg
Sounds very similar to what I experienced Corey. Something that seems to at
least help with my problems is to have more partitions. Am already fighting
between ending up with too many partitions in the end and having too few in
the beginning. By coalescing at late as possible and avoiding too few in
the beginning, the problems seems to decrease. Also, increasing
spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
significantly (~700 secs), the problems seems to almost disappear. Don't
wont to celebrate yet, still long way left before the job complete but it's
looking better...

On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run this
 same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
 wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors
 do you have total? And what is the memory size for each? You can change
 what fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing
 that executors are being lost as well. Thing is, I can't figure out how
 they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
 memory allocated for the application. I was thinking perhaps it was
 possible that a single executor was getting a single or a couple large
 partitions but shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason

Missing shuffle files

2015-02-21 Thread Anders Arpteg
For large jobs, the following error message is shown that seems to indicate
that shuffle files for some reason are missing. It's a rather large job
with many partitions. If the data size is reduced, the problem disappears.
I'm running a build from Spark master post 1.2 (build at 2015-01-16) and
running on Yarn 2.2. Any idea of how to resolve this problem?

User class threw exception: Job aborted due to stage failure: Task 450 in
stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage
450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
java.io.FileNotFoundException:
/disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
(No such file or directory)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.(FileOutputStream.java:221)
 at java.io.FileOutputStream.(FileOutputStream.java:171)
 at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
 at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
 at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
 at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

TIA,
Anders


Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Anders Arpteg
The nm logs only seems to contain similar to the following. Nothing else in
the same time range. Any help?

2015-02-12 20:47:31,245 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_02
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_12
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_22
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_32
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_42
2015-02-12 21:24:30,515 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: FINISH_APPLICATION sent to absent application
application_1422406067005_0053

On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 It seems unlikely to me that it would be a 2.2 issue, though not entirely
 impossible.  Are you able to find any of the container logs?  Is the
 NodeManager launching containers and reporting some exit code?

 -Sandy

 On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote:

 No, not submitting from windows, from a debian distribution. Had a quick
 look at the rm logs, and it seems some containers are allocated but then
 released again for some reason. Not easy to make sense of the logs, but
 here is a snippet from the logs (from a test in our small test cluster) if
 you'd like to have a closer look: http://pastebin.com/8WU9ivqC

 Sandy, sounds like it could possible be a 2.2 issue then, or what do you
 think?

 Thanks,
 Anders

 On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of YARN
 to see if you can trace the error. In the past I have to closely look at
 arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp

Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Anders Arpteg
Interesting to hear that it works for you. Are you using Yarn 2.2 as well?
No strange log message during startup, and can't see any other log messages
since no executer gets launched. Does not seems to work in yarn-client mode
either, failing with the exception below.

Exception in thread main org.apache.spark.SparkException: Yarn
application has already ended! It might have been killed or unable to
launch application master.
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
at org.apache.spark.SparkContext.init(SparkContext.scala:370)
at
com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
at com.spotify.analytics.DataSampler.main(DataSampler.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

/Anders


On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.  Any
 strange log messages or additional color you can provide on your setup?
 Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining the
 executors page in the web ui, only the driver is listed, no executors
 are ever received, and the driver keep waiting forever. Has anyone seemed
 similar problems?

 Thanks for any insights,
 Anders





Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Anders Arpteg
No, not submitting from windows, from a debian distribution. Had a quick
look at the rm logs, and it seems some containers are allocated but then
released again for some reason. Not easy to make sense of the logs, but
here is a snippet from the logs (from a test in our small test cluster) if
you'd like to have a closer look: http://pastebin.com/8WU9ivqC

Sandy, sounds like it could possible be a 2.2 issue then, or what do you
think?

Thanks,
Anders

On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of YARN
 to see if you can trace the error. In the past I have to closely look at
 arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining
 the executors page in the web ui, only the driver is listed, no
 executors are ever received, and the driver keep waiting forever. Has
 anyone seemed similar problems?

 Thanks for any insights,
 Anders






No executors allocated on yarn with latest master branch

2015-02-11 Thread Anders Arpteg
Hi,

Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2
and failed executing jobs in yarn-cluster mode for that build. Works
successfully with spark 1.2 (and also master from 2015-01-16), so something
has changed since then that prevents the job from receiving any executors
on the cluster.

Basic symptoms are that the jobs fires up the AM, but after examining the
executors page in the web ui, only the driver is listed, no executors are
ever received, and the driver keep waiting forever. Has anyone seemed
similar problems?

Thanks for any insights,
Anders


Re: Failing jobs runs twice

2015-01-16 Thread Anders Arpteg
FYI, I just confirmed with the latest Spark 1.3 snapshot that the
spark.yarn.maxAppAttempts setting that SPARK-2165 refers to works
perfectly. Great to finally get rid of this problem. Also caused an issue
when the eventLogs were enabled since the spark-events/appXXX folder
already exists the second time the app gets launched.

On Thu, Jan 15, 2015 at 3:01 PM, Anders Arpteg arp...@spotify.com wrote:

 Found a setting that seems to fix this problem, but it does not seems to
 be available until Spark 1.3. See
 https://issues.apache.org/jira/browse/SPARK-2165

 However, glad to see a work is being done with the issue.

 On Tue, Jan 13, 2015 at 8:00 PM, Anders Arpteg arp...@spotify.com wrote:

 Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to
 1 (thanks Sean), but with no luck. Any ideas?

 On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote:

 Hi Anders, are you using YARN by any chance?

 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com:

 Since starting using Spark 1.2, I've experienced an annoying issue with
 failing apps that gets executed twice. I'm not talking about tasks inside a
 job, that should be executed multiple times before failing the whole app.
 I'm talking about the whole app, that seems to close the previous Spark
 context, start a new, and rerun the app again.

 This is annoying since it overwrite the log files as well and it
 becomes hard to troubleshoot the failing app. Does anyone know how to turn
 this feature off?

 Thanks,
 Anders







Re: Failing jobs runs twice

2015-01-15 Thread Anders Arpteg
Found a setting that seems to fix this problem, but it does not seems to be
available until Spark 1.3. See
https://issues.apache.org/jira/browse/SPARK-2165

However, glad to see a work is being done with the issue.

On Tue, Jan 13, 2015 at 8:00 PM, Anders Arpteg arp...@spotify.com wrote:

 Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to
 1 (thanks Sean), but with no luck. Any ideas?

 On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote:

 Hi Anders, are you using YARN by any chance?

 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com:

 Since starting using Spark 1.2, I've experienced an annoying issue with
 failing apps that gets executed twice. I'm not talking about tasks inside a
 job, that should be executed multiple times before failing the whole app.
 I'm talking about the whole app, that seems to close the previous Spark
 context, start a new, and rerun the app again.

 This is annoying since it overwrite the log files as well and it becomes
 hard to troubleshoot the failing app. Does anyone know how to turn this
 feature off?

 Thanks,
 Anders






Re: Trouble with large Yarn job

2015-01-14 Thread Anders Arpteg
Interesting, sounds plausible. Another way to avoid the problem has been to
cache intermediate output for large jobs (i.e. split large jobs into
smaller and then union together) Unfortunately that this type of tweaking
should be necessary though, hopefully better in 1.2.1.

On Tue, Jan 13, 2015 at 3:29 AM, Sven Krasser kras...@gmail.com wrote:

 Anders,

 This could be related to this open ticket:
 https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce()
 also fixed that for us as a stopgap.

 Best,
 -Sven


 On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg arp...@spotify.com
 wrote:

 Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
 actually been able to solve the problem finally, and it seems to be an
 issue with too many partitions. The repartitioning I tried initially did so
 after the union, and then it's too late. By repartitioning as early as
 possible, and significantly reducing number of partitions (going from
 100,000+ to ~6,000 partitions), the job succeeds and no more Error
 communicating with MapOutputTracker issues. Seems like an issue with
 handling too many partitions and executors as the same time.

 Would be awesome with an auto-repartition function, that checks sizes
 of existing partitions and compares with the HDFS block size. If too small
 (or too large), it would repartition to partition sizes similar to the
 block size...

 Hope this help others with similar issues.

 Best,
 Anders

 On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 Have you checked your NodeManager logs to make sure YARN isn't killing
 executors for exceeding memory limits?

 -Sandy

 On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 Hey,

 I have a job that keeps failing if too much data is processed, and I
 can't see how to get it working. I've tried repartitioning with more
 partitions and increasing amount of memory for the executors (now about 12G
 and 400 executors. Here is a snippets of the first part of the code, which
 succeeds without any problems:

 val all_days = sc.union(
   ds.dateInterval(startDate, date).map(date =
 sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
   .map(s = (
 (s.getUsername, s.getTrackUri),
 UserItemData(s.getUsername, s.getTrackUri,
   build_vector1(date, s),
   build_vector2(s
   )
 )
   .reduceByKey(sum_vectors)

 I want to process 30 days of data or more, but am only able to process
 about 10 days. If having more days of data (lower startDate in code
 above), the union above succeeds but the code below fails with Error
 communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL
 for more detailed error messages). Here is a snippet of the code that 
 fails:

 val top_tracks = all_days.map(t = (t._1._2.toString, 1)).
 reduceByKey(_+_)
   .filter(trackFilter)
   .repartition(4)
   .persist(StorageLevel.MEMORY_AND_DISK_SER)

 val observation_data = all_days
   .mapPartitions(_.map(o = (o._1._2.toString, o._2)))
   .join(top_tracks)

 The calculation of top_tracks works, but the last mapPartitions task
 fails with given error message if given more than 10 days of data. Also
 tried increasing the spark.akka.askTimeout setting, but it still fails
 even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
 and the kryo serialization.

 Realize that this is a rather long message, but I'm stuck and would
 appreciate any help or clues for resolving this issue. Seems to be a
 out-of-memory issue, but it does not seems to help to increase the number
 of partitions.

 Thanks,
 Anders






 --
 http://sites.google.com/site/krasser/?utm_source=sig



Re: Failing jobs runs twice

2015-01-13 Thread Anders Arpteg
Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to 1
(thanks Sean), but with no luck. Any ideas?

On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote:

 Hi Anders, are you using YARN by any chance?

 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com:

 Since starting using Spark 1.2, I've experienced an annoying issue with
 failing apps that gets executed twice. I'm not talking about tasks inside a
 job, that should be executed multiple times before failing the whole app.
 I'm talking about the whole app, that seems to close the previous Spark
 context, start a new, and rerun the app again.

 This is annoying since it overwrite the log files as well and it becomes
 hard to troubleshoot the failing app. Does anyone know how to turn this
 feature off?

 Thanks,
 Anders





Failing jobs runs twice

2015-01-13 Thread Anders Arpteg
Since starting using Spark 1.2, I've experienced an annoying issue with
failing apps that gets executed twice. I'm not talking about tasks inside a
job, that should be executed multiple times before failing the whole app.
I'm talking about the whole app, that seems to close the previous Spark
context, start a new, and rerun the app again.

This is annoying since it overwrite the log files as well and it becomes
hard to troubleshoot the failing app. Does anyone know how to turn this
feature off?

Thanks,
Anders


Re: Trouble with large Yarn job

2015-01-12 Thread Anders Arpteg
Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
actually been able to solve the problem finally, and it seems to be an
issue with too many partitions. The repartitioning I tried initially did so
after the union, and then it's too late. By repartitioning as early as
possible, and significantly reducing number of partitions (going from
100,000+ to ~6,000 partitions), the job succeeds and no more Error
communicating with MapOutputTracker issues. Seems like an issue with
handling too many partitions and executors as the same time.

Would be awesome with an auto-repartition function, that checks sizes of
existing partitions and compares with the HDFS block size. If too small (or
too large), it would repartition to partition sizes similar to the block
size...

Hope this help others with similar issues.

Best,
Anders

On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Anders,

 Have you checked your NodeManager logs to make sure YARN isn't killing
 executors for exceeding memory limits?

 -Sandy

 On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote:

 Hey,

 I have a job that keeps failing if too much data is processed, and I
 can't see how to get it working. I've tried repartitioning with more
 partitions and increasing amount of memory for the executors (now about 12G
 and 400 executors. Here is a snippets of the first part of the code, which
 succeeds without any problems:

 val all_days = sc.union(
   ds.dateInterval(startDate, date).map(date =
 sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
   .map(s = (
 (s.getUsername, s.getTrackUri),
 UserItemData(s.getUsername, s.getTrackUri,
   build_vector1(date, s),
   build_vector2(s
   )
 )
   .reduceByKey(sum_vectors)

 I want to process 30 days of data or more, but am only able to process
 about 10 days. If having more days of data (lower startDate in code
 above), the union above succeeds but the code below fails with Error
 communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL
 for more detailed error messages). Here is a snippet of the code that fails:

 val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey
 (_+_)
   .filter(trackFilter)
   .repartition(4)
   .persist(StorageLevel.MEMORY_AND_DISK_SER)

 val observation_data = all_days
   .mapPartitions(_.map(o = (o._1._2.toString, o._2)))
   .join(top_tracks)

 The calculation of top_tracks works, but the last mapPartitions task
 fails with given error message if given more than 10 days of data. Also
 tried increasing the spark.akka.askTimeout setting, but it still fails
 even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
 and the kryo serialization.

 Realize that this is a rather long message, but I'm stuck and would
 appreciate any help or clues for resolving this issue. Seems to be a
 out-of-memory issue, but it does not seems to help to increase the number
 of partitions.

 Thanks,
 Anders





Re: Queue independent jobs

2015-01-09 Thread Anders Arpteg
Awesome, it actually seems to work. Amazing how simple it can be
sometimes...

Thanks Sean!

On Fri, Jan 9, 2015 at 12:42 PM, Sean Owen so...@cloudera.com wrote:

 You can parallelize on the driver side. The way to do it is almost
 exactly what you have here, where you're iterating over a local Scala
 collection of dates and invoking a Spark operation for each. Simply
 write dateList.par.map(...) to make the local map proceed in
 parallel. It should invoke the Spark jobs simultaneously.

 On Fri, Jan 9, 2015 at 10:46 AM, Anders Arpteg arp...@spotify.com wrote:
  Hey,
 
  Lets say we have multiple independent jobs that each transform some data
 and
  store in distinct hdfs locations, is there a nice way to run them in
  parallel? See the following pseudo code snippet:
 
  dateList.map(date =
  sc.hdfsFile(date).map(transform).saveAsHadoopFile(date))
 
  It's unfortunate if they run in sequence, since all the executors are not
  used efficiently. What's the best way to parallelize execution of these
  jobs?
 
  Thanks,
  Anders



Queue independent jobs

2015-01-09 Thread Anders Arpteg
Hey,

Lets say we have multiple independent jobs that each transform some data
and store in distinct hdfs locations, is there a nice way to run them in
parallel? See the following pseudo code snippet:

dateList.map(date =
sc.hdfsFile(date).map(transform).saveAsHadoopFile(date))

It's unfortunate if they run in sequence, since all the executors are not
used efficiently. What's the best way to parallelize execution of these
jobs?

Thanks,
Anders


Trouble with large Yarn job

2015-01-07 Thread Anders Arpteg
Hey,

I have a job that keeps failing if too much data is processed, and I can't
see how to get it working. I've tried repartitioning with more partitions
and increasing amount of memory for the executors (now about 12G and 400
executors. Here is a snippets of the first part of the code, which succeeds
without any problems:

val all_days = sc.union(
  ds.dateInterval(startDate, date).map(date =
sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
  .map(s = (
(s.getUsername, s.getTrackUri),
UserItemData(s.getUsername, s.getTrackUri,
  build_vector1(date, s),
  build_vector2(s
  )
)
  .reduceByKey(sum_vectors)

I want to process 30 days of data or more, but am only able to process
about 10 days. If having more days of data (lower startDatein code above),
the union above succeeds but the code below fails with Error communicating
with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed
error messages). Here is a snippet of the code that fails:

val top_tracks = all_days.map(t = (t._1._2.toString,
1)).reduceByKey(_+_)
  .filter(trackFilter)
  .repartition(4)
  .persist(StorageLevel.MEMORY_AND_DISK_SER)

val observation_data = all_days
  .mapPartitions(_.map(o = (o._1._2.toString, o._2)))
  .join(top_tracks)

The calculation of top_tracks works, but the last mapPartitions task fails
with given error message if given more than 10 days of data. Also tried
increasing the spark.akka.askTimeout setting, but it still fails even if
10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and
the kryo serialization.

Realize that this is a rather long message, but I'm stuck and would
appreciate any help or clues for resolving this issue. Seems to be a
out-of-memory issue, but it does not seems to help to increase the number
of partitions.

Thanks,
Anders


Trouble with large Yarn job

2015-01-06 Thread Anders Arpteg
Hey,

I have a job that keeps failing if too much data is processed, and I can't
see how to get it working. I've tried repartitioning with more partitions
and increasing amount of memory for the executors (now about 12G and 400
executors. Here is a snippets of the first part of the code, which succeeds
without any problems:

val all_days = sc.union(
  ds.dateInterval(startDate, date).map(date =
sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
  .map(s = (
(s.getUsername, s.getTrackUri),
UserItemData(s.getUsername, s.getTrackUri,
  build_vector1(date, s),
  build_vector2(s
  )
)
  .reduceByKey(sum_vectors)

I want to process 30 days of data or more, but am only able to process
about 10 days. If having more days of data (lower startDate in code above),
the union above succeeds but the code below fails with Error communicating
with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed
error messages). Here is a snippet of the code that fails:

val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey
(_+_)
  .filter(trackFilter)
  .repartition(4)
  .persist(StorageLevel.MEMORY_AND_DISK_SER)

val observation_data = all_days
  .mapPartitions(_.map(o = (o._1._2.toString, o._2)))
  .join(top_tracks)

The calculation of top_tracks works, but the last mapPartitions task fails
with given error message if given more than 10 days of data. Also tried
increasing the spark.akka.askTimeout setting, but it still fails even if
10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the
kryo serialization.

Realize that this is a rather long message, but I'm stuck and would
appreciate any help or clues for resolving this issue. Seems to be a
out-of-memory issue, but it does not seems to help to increase the number
of partitions.

Thanks,
Anders


Re: Dynamic Allocation in Spark 1.2.0

2014-12-29 Thread Anders Arpteg
Thanks Tsuyoshi and Shixiong for the info. Awesome with more documentation
about the feature!

Was afraid that the node manager needed reconfiguration (and restart). Any
idea of how much resources will the shuffle service take on the node
manager? In a multi-tenant Hadoop cluster environment, it would be
undesirable to have a Spark-specific long running service taking up
resources from other types of jobs on the cluster.

Thanks again,
Anders

On Sun, Dec 28, 2014 at 8:08 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 I encountered the following issue when enabling dynamicAllocation. You may
 want to take a look at it.

 https://issues.apache.org/jira/browse/SPARK-4951

 Best Regards,
 Shixiong Zhu

 2014-12-28 2:07 GMT+08:00 Tsuyoshi OZAWA ozawa.tsuyo...@gmail.com:

 Hi Anders,

 I faced the same issue as you mentioned. Yes, you need to install
 spark shuffle plugin for YARN. Please check following PRs which add
 doc to enable dynamicAllocation:

 https://github.com/apache/spark/pull/3731
 https://github.com/apache/spark/pull/3757

 I could run Spark on YARN with dynamicAllocation by following the
 instructions described in the docs.

 Thanks,
 - Tsuyoshi

 On Sat, Dec 27, 2014 at 11:06 PM, Anders Arpteg arp...@spotify.com
 wrote:
  Hey,
 
  Tried to get the new spark.dynamicAllocation.enabled feature working on
 Yarn
  (Hadoop 2.2), but am unsuccessful so far. I've tested with the following
  settings:
 
conf
  .set(spark.dynamicAllocation.enabled, true)
  .set(spark.shuffle.service.enabled, true)
  .set(spark.dynamicAllocation.minExecutors, 10)
  .set(spark.dynamicAllocation.maxExecutors, 700)
 
  The app works fine on Spark 1.2 if dynamicAllocation is not enabled, but
  with the settings above, it will start the app and the first job is
 listed
  in the web ui. However, no tasks are started and it seems to be stuck
  waiting for a container to be allocated forever.
 
  Any help would be appreciated. Need to do something specific to get the
  external yarn shuffle service running in the node manager?
 
  TIA,
  Anders



 --
 - Tsuyoshi

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Dynamic Allocation in Spark 1.2.0

2014-12-27 Thread Anders Arpteg
Hey,

Tried to get the new spark.dynamicAllocation.enabled feature working on
Yarn (Hadoop 2.2), but am unsuccessful so far. I've tested with the
following settings:

  conf
.set(spark.dynamicAllocation.enabled, true)
.set(spark.shuffle.service.enabled, true)
.set(spark.dynamicAllocation.minExecutors, 10)
.set(spark.dynamicAllocation.maxExecutors, 700)

The app works fine on Spark 1.2 if dynamicAllocation is not enabled, but
with the settings above, it will start the app and the first job is listed
in the web ui. However, no tasks are started and it seems to be stuck
waiting for a container to be allocated forever.

Any help would be appreciated. Need to do something specific to get the
external yarn shuffle service running in the node manager?

TIA,
Anders


Dynamic Allocation in Spark 1.2.0

2014-12-27 Thread Anders Arpteg
Hey,

Tried to get the new spark.dynamicAllocation.enabled feature working on
Yarn (Hadoop 2.2), but am unsuccessful so far. I've tested with the
following settings:

  conf
.set(spark.dynamicAllocation.enabled, true)
.set(spark.shuffle.service.enabled, true)
.set(spark.dynamicAllocation.minExecutors, 10)
.set(spark.dynamicAllocation.maxExecutors, 700)

The app works fine on Spark 1.2 if dynamicAllocation is not enabled, but
with the settings above, it will start the app and the first job is listed
in the web ui. However, no tasks are started and it seems to be stuck
waiting for a container to be allocated forever.

Any help would be appreciated. Need to do something specific to get the
external yarn shuffle service running in the node manager?

TIA,
Anders