GC problem doing fuzzy join

2019-06-18 Thread Arun Luthra
I'm trying to do a brute force fuzzy join where I compare N records against
N other records, for N^2 total comparisons.

The table is medium size and fits in memory, so I collect it and put it
into a broadcast variable.

The other copy of the table is in an RDD. I am basically calling the RDD
map operation, and each record in the RDD takes the broadcasted table and
FILTERS it. There appears to be large GC happening, so I suspect that huge
repeated data deletion of copies of the broadcast table is causing GC.

Is there a way to fix this pattern?

Thanks,
Arun


Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS

2016-08-24 Thread Arun Luthra
Also for the record, turning on kryo was not able to help.

On Tue, Aug 23, 2016 at 12:58 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> Splitting up the Maps to separate objects did not help.
>
> However, I was able to work around the problem by reimplementing it with
> RDD joins.
>
> On Aug 18, 2016 5:16 PM, "Arun Luthra" <arun.lut...@gmail.com> wrote:
>
>> This might be caused by a few large Map objects that Spark is trying to
>> serialize. These are not broadcast variables or anything, they're just
>> regular objects.
>>
>> Would it help if I further indexed these maps into a two-level Map i.e.
>> Map[String, Map[String, Int]] ? Or would this still count against me?
>>
>> What if I manually split them up into numerous Map variables?
>>
>> On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra <arun.lut...@gmail.com>
>> wrote:
>>
>>> I got this OOM error in Spark local mode. The error seems to have been
>>> at the start of a stage (all of the stages on the UI showed as complete,
>>> there were more stages to do but had not showed up on the UI yet).
>>>
>>> There appears to be ~100G of free memory at the time of the error.
>>>
>>> Spark 2.0.0
>>> 200G driver memory
>>> local[30]
>>> 8 /mntX/tmp directories for spark.local.dir
>>> "spark.sql.shuffle.partitions", "500"
>>> "spark.driver.maxResultSize","500"
>>> "spark.default.parallelism", "1000"
>>>
>>> The line number for the error is at an RDD map operation where there are
>>> some potentially large Map objects that are going to be accessed by each
>>> record. Does it matter if they are broadcast variables or not? I imagine
>>> not because its in local mode they should be available in memory to every
>>> executor/core.
>>>
>>> Possibly related:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cl
>>> osureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html
>>>
>>> Exception in thread "main" java.lang.OutOfMemoryError
>>> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputSt
>>> ream.java:123)
>>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>>> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutput
>>> Stream.java:93)
>>> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>>> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Objec
>>> tOutputStream.java:1877)
>>> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDat
>>> aMode(ObjectOutputStream.java:1786)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at org.apache.spark.serializer.JavaSerializationStream.writeObj
>>> ect(JavaSerializer.scala:43)
>>> at org.apache.spark.serializer.JavaSerializerInstance.serialize
>>> (JavaSerializer.scala:100)
>>> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(Clo
>>> sureCleaner.scala:295)
>>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>>> ClosureCleaner$$clean(ClosureCleaner.scala:288)
>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
>>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:151)
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:112)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>>> at org.apache.spark.rdd.RDD.map(RDD.scala:365)
>>> at abc.Abc$.main(abc.scala:395)
>>> at abc.Abc.main(abc.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
>>> $SparkSubmit$$runMain(SparkSubmit.scala:729)
>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit
>>> .scala:185)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>
>>


Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS

2016-08-23 Thread Arun Luthra
Splitting up the Maps to separate objects did not help.

However, I was able to work around the problem by reimplementing it with
RDD joins.

On Aug 18, 2016 5:16 PM, "Arun Luthra" <arun.lut...@gmail.com> wrote:

> This might be caused by a few large Map objects that Spark is trying to
> serialize. These are not broadcast variables or anything, they're just
> regular objects.
>
> Would it help if I further indexed these maps into a two-level Map i.e.
> Map[String, Map[String, Int]] ? Or would this still count against me?
>
> What if I manually split them up into numerous Map variables?
>
> On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> I got this OOM error in Spark local mode. The error seems to have been at
>> the start of a stage (all of the stages on the UI showed as complete, there
>> were more stages to do but had not showed up on the UI yet).
>>
>> There appears to be ~100G of free memory at the time of the error.
>>
>> Spark 2.0.0
>> 200G driver memory
>> local[30]
>> 8 /mntX/tmp directories for spark.local.dir
>> "spark.sql.shuffle.partitions", "500"
>> "spark.driver.maxResultSize","500"
>> "spark.default.parallelism", "1000"
>>
>> The line number for the error is at an RDD map operation where there are
>> some potentially large Map objects that are going to be accessed by each
>> record. Does it matter if they are broadcast variables or not? I imagine
>> not because its in local mode they should be available in memory to every
>> executor/core.
>>
>> Possibly related:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cl
>> osureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html
>>
>> Exception in thread "main" java.lang.OutOfMemoryError
>> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputSt
>> ream.java:123)
>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutput
>> Stream.java:93)
>> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Objec
>> tOutputStream.java:1877)
>> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDat
>> aMode(ObjectOutputStream.java:1786)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at org.apache.spark.serializer.JavaSerializationStream.writeObj
>> ect(JavaSerializer.scala:43)
>> at org.apache.spark.serializer.JavaSerializerInstance.serialize
>> (JavaSerializer.scala:100)
>> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(Clo
>> sureCleaner.scala:295)
>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>> ClosureCleaner$$clean(ClosureCleaner.scala:288)
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>> onScope.scala:151)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>> onScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>> at org.apache.spark.rdd.RDD.map(RDD.scala:365)
>> at abc.Abc$.main(abc.scala:395)
>> at abc.Abc.main(abc.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
>> $SparkSubmit$$runMain(SparkSubmit.scala:729)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit
>> .scala:185)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>


Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS

2016-08-18 Thread Arun Luthra
This might be caused by a few large Map objects that Spark is trying to
serialize. These are not broadcast variables or anything, they're just
regular objects.

Would it help if I further indexed these maps into a two-level Map i.e.
Map[String, Map[String, Int]] ? Or would this still count against me?

What if I manually split them up into numerous Map variables?

On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> I got this OOM error in Spark local mode. The error seems to have been at
> the start of a stage (all of the stages on the UI showed as complete, there
> were more stages to do but had not showed up on the UI yet).
>
> There appears to be ~100G of free memory at the time of the error.
>
> Spark 2.0.0
> 200G driver memory
> local[30]
> 8 /mntX/tmp directories for spark.local.dir
> "spark.sql.shuffle.partitions", "500"
> "spark.driver.maxResultSize","500"
> "spark.default.parallelism", "1000"
>
> The line number for the error is at an RDD map operation where there are
> some potentially large Map objects that are going to be accessed by each
> record. Does it matter if they are broadcast variables or not? I imagine
> not because its in local mode they should be available in memory to every
> executor/core.
>
> Possibly related:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html
>
> Exception in thread "main" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:
> 123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(
> ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(
> ObjectOutputStream.java:1877)
> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(
> ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at org.apache.spark.serializer.JavaSerializationStream.
> writeObject(JavaSerializer.scala:43)
> at org.apache.spark.serializer.JavaSerializerInstance.
> serialize(JavaSerializer.scala:100)
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:295)
> at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> at org.apache.spark.rdd.RDD.map(RDD.scala:365)
> at abc.Abc$.main(abc.scala:395)
> at abc.Abc.main(abc.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>


Spark 2.0.0 OOM error at beginning of RDD map on AWS

2016-08-15 Thread Arun Luthra
I got this OOM error in Spark local mode. The error seems to have been at
the start of a stage (all of the stages on the UI showed as complete, there
were more stages to do but had not showed up on the UI yet).

There appears to be ~100G of free memory at the time of the error.

Spark 2.0.0
200G driver memory
local[30]
8 /mntX/tmp directories for spark.local.dir
"spark.sql.shuffle.partitions", "500"
"spark.driver.maxResultSize","500"
"spark.default.parallelism", "1000"

The line number for the error is at an RDD map operation where there are
some potentially large Map objects that are going to be accessed by each
record. Does it matter if they are broadcast variables or not? I imagine
not because its in local mode they should be available in memory to every
executor/core.

Possibly related:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html

Exception in thread "main" java.lang.OutOfMemoryError
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at abc.Abc$.main(abc.scala:395)
at abc.Abc.main(abc.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Re: groupByKey() compile error after upgrading from 1.6.2 to 2.0.0

2016-08-10 Thread Arun Luthra
Thanks, pair_rdd.rdd.groupByKey() did the trick.

On Wed, Aug 10, 2016 at 8:24 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> So it looks like (despite the name) pair_rdd is actually a Dataset - my
> guess is you might have a map on a dataset up above which used to return an
> RDD but now returns another dataset or an unexpected implicit conversion.
> Just add rdd() before the groupByKey call to push it into an RDD. That
> being said - groupByKey generally is an anti-pattern so please be careful
> with it.
>
> On Wed, Aug 10, 2016 at 8:07 PM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> Here is the offending line:
>>
>> val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter:
>> Iterable[MyData]) => {
>> ...
>>
>>
>> [error]  .scala:249: overloaded method value groupByKey with
>> alternatives:
>> [error]   [K](func: 
>> org.apache.spark.api.java.function.MapFunction[(aaa.MyKey,
>> aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[K
>> ])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)]
>> 
>> [error]   [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4:
>> org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
>> aaa.MyData)]
>> [error]  cannot be applied to ()
>> [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk:
>> MyKey, hd_iter: Iterable[MyData]) => {
>> [error] ^
>> [warn] .scala:249: non-variable type argument aaa.MyData in
>> type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData])
>> is unchecked since it is eliminated by erasure
>> [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk:
>> MyKey, hd_iter: Iterable[MyData]) => {
>> [warn]
>>   ^
>> [warn] one warning found
>>
>>
>> I can't see any obvious API change... what is the problem?
>>
>> Thanks,
>> Arun
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


groupByKey() compile error after upgrading from 1.6.2 to 2.0.0

2016-08-10 Thread Arun Luthra
Here is the offending line:

val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter:
Iterable[MyData]) => {
...


[error]  .scala:249: overloaded method value groupByKey with
alternatives:
[error]   [K](func:
org.apache.spark.api.java.function.MapFunction[(aaa.MyKey, aaa.MyData),K],
encoder:
org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
aaa.MyData)] 
[error]   [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4:
org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
aaa.MyData)]
[error]  cannot be applied to ()
[error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey,
hd_iter: Iterable[MyData]) => {
[error] ^
[warn] .scala:249: non-variable type argument aaa.MyData in
type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData])
is unchecked since it is eliminated by erasure
[warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey,
hd_iter: Iterable[MyData]) => {
[warn]
^
[warn] one warning found


I can't see any obvious API change... what is the problem?

Thanks,
Arun


Re: TaskCommitDenied (Driver denied task commit)

2016-01-22 Thread Arun Luthra
Correction. I have to use spark.yarn.am.memoryOverhead because I'm in Yarn
client mode. I set it to 13% of the executor memory.

Also quite helpful was increasing the total overall executor memory.

It will be great when tungsten enhancements make there way into RDDs.

Thanks!

Arun

On Thu, Jan 21, 2016 at 6:19 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> Two changes I made that appear to be keeping various errors at bay:
>
> 1) bumped up spark.yarn.executor.memoryOverhead to 2000 in the spirit of
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3ccacbyxkld8qasymj2ghk__vttzv4gejczcqfaw++s1d5te1d...@mail.gmail.com%3E
> . Even though I couldn't find the same error in my yarn log.
>
> 2) very important: I ran coalesce(1000) on the RDD at the start of the
> DAG. I know keeping the # of partitions lower is helpful, based on past
> experience with groupByKey. I haven't run this pipeline in a bit so that
> rule of thumb was not forefront in my mind.
>
> On Thu, Jan 21, 2016 at 5:35 PM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> Looking into the yarn logs for a similar job where an executor was
>> associated with the same error, I find:
>>
>> ...
>> 16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive
>> connection to (SERVER), creating a new one.
>> 16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while
>> beginning fetch of 46 outstanding blocks*
>> *java.io.IOException: Failed to connect to (SERVER)*
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:112)
>> at
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> *Caused by: java.net.ConnectException: Connection refused:* (SERVER)
>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>> at
>> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>> at
>> io.n

MemoryStore: Not enough space to cache broadcast_N in memory

2016-01-21 Thread Arun Luthra
WARN MemoryStore: Not enough space to cache broadcast_4 in memory!
(computed 60.2 MB so far)
WARN MemoryStore: Persisting block broadcast_4 to disk instead.


Can I increase the memory allocation for broadcast variables?

I have a few broadcast variables that I create with sc.broadcast() . Are
these labeled starting from 0 or from 1 (in reference to "broadcast_N")? I
want to debug/track down which one is offending.

As a feature request, it would be good if there were an optional argument
(or perhaps a requireed argument) added to sc.broadcast() so that we could
give it an internal label. Then it would work the same as the
sc.accumulator() "name" argument. It would enable more useful warn/error
messages.

Arun


Re: TaskCommitDenied (Driver denied task commit)

2016-01-21 Thread Arun Luthra
16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

16/01/21 21:52:14 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.

spark.yarn.driver.memoryOverhead is set but does not apply in client mode.

16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.

16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache broadcast_4
in memory! (computed 60.2 MB so far)

16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to disk
instead.

[Stage 1:>(2260 + 7) /
2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in stage 1.0
(TID 4530, --): TaskCommitDenied (Driver denied task commit) for job: 1,
partition: 1440, attempt: 4530

[Stage 1:>(2260 + 6) /
2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in stage 1.0
(TID 4531, --): TaskCommitDenied (Driver denied task commit) for job: 1,
partition: 1488, attempt: 4531

[Stage 1:>(2261 + 4) /
2262]16/01/21 21:57:39 WARN TaskSetManager: Lost task 1982.1 in stage 1.0
(TID 4532, --): TaskCommitDenied (Driver denied task commit) for job: 1,
partition: 1982, attempt: 4532

16/01/21 21:57:57 WARN TaskSetManager: Lost task 2214.0 in stage 1.0 (TID
4482, --): TaskCommitDenied (Driver denied task commit) for job: 1,
partition: 2214, attempt: 4482

16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 (TID
4436, --): TaskCommitDenied (Driver denied task commit) for job: 1,
partition: 2168, attempt: 4436


I am running with:

spark-submit --class "myclass" \

  --num-executors 90 \

  --driver-memory 1g \

  --executor-memory 60g \

  --executor-cores 8 \

  --master yarn-client \

  --conf "spark.executor.extraJavaOptions=-verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \

  my.jar


There are 2262 input files totaling just 98.6G. The DAG is basically
textFile().map().filter().groupByKey().saveAsTextFile().

On Thu, Jan 21, 2016 at 2:14 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> Can you post more of your log? How big are the partitions? What is the
> action you are performing?
>
> On Thu, Jan 21, 2016 at 2:02 PM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> Example warning:
>>
>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 (TID
>> 4436, XXX): TaskCommitDenied (Driver denied task commit) for job: 1,
>> partition: 2168, attempt: 4436
>>
>>
>> Is there a solution for this? Increase driver memory? I'm using just 1G
>> driver memory but ideally I won't have to increase it.
>>
>> The RDD being processed has 2262 partitions.
>>
>> Arun
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


TaskCommitDenied (Driver denied task commit)

2016-01-21 Thread Arun Luthra
Example warning:

16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 (TID
4436, XXX): TaskCommitDenied (Driver denied task commit) for job: 1,
partition: 2168, attempt: 4436


Is there a solution for this? Increase driver memory? I'm using just 1G
driver memory but ideally I won't have to increase it.

The RDD being processed has 2262 partitions.

Arun


Re: TaskCommitDenied (Driver denied task commit)

2016-01-21 Thread Arun Luthra
Usually the pipeline works, it just failed on this particular input data.
The other data it has run on is of similar size.

Speculation is enabled.

I'm using Spark 1.5.0.

Here is the config. Many of these may not be needed anymore, they are from
trying to get things working in Spark 1.2 and 1.3.

.set("spark.storage.memoryFraction","0.2") // default 0.6
.set("spark.shuffle.memoryFraction","0.2") // default 0.2
.set("spark.shuffle.manager","SORT") // preferred setting for
optimized joins
.set("spark.shuffle.consolidateFiles","true") // helpful for "too
many files open"
.set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
errors?
.set("spark.akka.frameSize","300") // helpful when using
consildateFiles=true
.set("spark.shuffle.compress","false") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
.set("spark.file.transferTo","false") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
.set("spark.core.connection.ack.wait.timeout","600") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
.set("spark.speculation","true")
.set("spark.worker.timeout","600") //
http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
.set("spark.akka.timeout","300") //
http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
.set("spark.storage.blockManagerSlaveTimeoutMs","12")
.set("spark.driver.maxResultSize","2048") // in response to error:
Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator","--.MyRegistrator")
.set("spark.kryo.registrationRequired", "true")
.set("spark.yarn.executor.memoryOverhead","600")

On Thu, Jan 21, 2016 at 2:50 PM, Josh Rosen <joshro...@databricks.com>
wrote:

> Is speculation enabled? This TaskCommitDenied by driver error is thrown by
> writers who lost the race to commit an output partition. I don't think this
> had anything to do with key skew etc. Replacing the groupbykey with a count
> will mask this exception because the coordination does not get triggered in
> non save/write operations.
>
> On Thu, Jan 21, 2016 at 2:46 PM Holden Karau <hol...@pigscanfly.ca> wrote:
>
>> Before we dig too far into this, the thing which most quickly jumps out
>> to me is groupByKey which could be causing some problems - whats the
>> distribution of keys like? Try replacing the groupByKey with a count() and
>> see if the pipeline works up until that stage. Also 1G of driver memory is
>> also a bit small for something with 90 executors...
>>
>> On Thu, Jan 21, 2016 at 2:40 PM, Arun Luthra <arun.lut...@gmail.com>
>> wrote:
>>
>>>
>>>
>>> 16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>>
>>> 16/01/21 21:52:14 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>>
>>> spark.yarn.driver.memoryOverhead is set but does not apply in client
>>> mode.
>>>
>>> 16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local
>>> reads feature cannot be used because libhadoop cannot be loaded.
>>>
>>> 16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache
>>> broadcast_4 in memory! (computed 60.2 MB so far)
>>>
>>> 16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to disk
>>> instead.
>>>
>>> [Stage 1:>(2260 + 7)
>>> / 2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in stage 1.0
>>> (TID 4530, --): TaskCommitDenied (Driver denied task commit) for job: 1,
>>> partition: 1440, attempt: 4530
>>>
>>> [Stage 1:>(2260 + 6)
>>> / 2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in stage 1.0
>>> (TID 4531, --): TaskCommitDenied (Driver denied task commit) for job: 1,
>>> partition: 1488, attempt: 4531
>>>
>>> 

Re: TaskCommitDenied (Driver denied task commit)

2016-01-21 Thread Arun Luthra
Looking into the yarn logs for a similar job where an executor was
associated with the same error, I find:

...
16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive
connection to (SERVER), creating a new one.
16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 46 outstanding blocks*
*java.io.IOException: Failed to connect to (SERVER)*
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:112)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
*Caused by: java.net.ConnectException: Connection refused:* (SERVER)
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more

...


Not sure if this reveals anything at all.


On Thu, Jan 21, 2016 at 2:58 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> My hunch is that the TaskCommitDenied is perhaps a red hearing and the
> problem is groupByKey - but I've also just seen a lot of people be bitten
> by it so that might not be issue. If you just do a count at the point of
> the groupByKey does the pipeline succeed?
>
> On Thu, Jan 21, 2016 at 2:56 PM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> Usually the pipeline works, it just failed on this particular input data.
>> The other data it has run on is of similar size.
>>
>> Speculation is enabled.
>>
>> I'm using Spark 1.5.0.
>>
>> Here is the config. Many of these may not be needed anymore, they are
>> from trying to get things working in Spark 1.2 and 1.3.
>>
>> .set("spark.storage.memoryFraction","0.2") // default 0.6
>> .set("spark.shuffle.memoryFraction","0.2") // default 0.2
>> .set("spark.shuffle.manager","SORT") // preferred setting for
>> optimized joins
>

Re: TaskCommitDenied (Driver denied task commit)

2016-01-21 Thread Arun Luthra
Two changes I made that appear to be keeping various errors at bay:

1) bumped up spark.yarn.executor.memoryOverhead to 2000 in the spirit of
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3ccacbyxkld8qasymj2ghk__vttzv4gejczcqfaw++s1d5te1d...@mail.gmail.com%3E
. Even though I couldn't find the same error in my yarn log.

2) very important: I ran coalesce(1000) on the RDD at the start of the DAG.
I know keeping the # of partitions lower is helpful, based on past
experience with groupByKey. I haven't run this pipeline in a bit so that
rule of thumb was not forefront in my mind.

On Thu, Jan 21, 2016 at 5:35 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> Looking into the yarn logs for a similar job where an executor was
> associated with the same error, I find:
>
> ...
> 16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive
> connection to (SERVER), creating a new one.
> 16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 46 outstanding blocks*
> *java.io.IOException: Failed to connect to (SERVER)*
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:112)
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> *Caused by: java.net.ConnectException: Connection refused:* (SERVER)
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 more
>
> ...
>
>
> Not sure if this reveals anything at all.
>
>
> On Thu, Jan 21, 2016 at 2:58 PM, Holden Karau <hol...@pigscanfly.ca>

groupByKey does not work?

2016-01-04 Thread Arun Luthra
I tried groupByKey and noticed that it did not group all values into the
same group.

In my test dataset (a Pair rdd) I have 16 records, where there are only 4
distinct keys, so I expected there to be 4 records in the groupByKey
object, but instead there were 8. Each of the 4 distinct keys appear 2
times.

Is this the expected behavior? I need to be able to get ALL values
associated with each key grouped into a SINGLE record. Is it possible?

Arun

p.s. reducebykey will not be sufficient for me


Re: groupByKey does not work?

2016-01-04 Thread Arun Luthra
Spark 1.5.0

data:

p1,lo1,8,0,4,0,5,20150901|5,1,1.0
p1,lo2,8,0,4,0,5,20150901|5,1,1.0
p1,lo3,8,0,4,0,5,20150901|5,1,1.0
p1,lo4,8,0,4,0,5,20150901|5,1,1.0
p1,lo1,8,0,4,0,5,20150901|5,1,1.0
p1,lo2,8,0,4,0,5,20150901|5,1,1.0
p1,lo3,8,0,4,0,5,20150901|5,1,1.0
p1,lo4,8,0,4,0,5,20150901|5,1,1.0
p1,lo1,8,0,4,0,5,20150901|5,1,1.0
p1,lo2,8,0,4,0,5,20150901|5,1,1.0
p1,lo3,8,0,4,0,5,20150901|5,1,1.0
p1,lo4,8,0,4,0,5,20150901|5,1,1.0
p1,lo1,8,0,4,0,5,20150901|5,1,1.0
p1,lo2,8,0,4,0,5,20150901|5,1,1.0
p1,lo3,8,0,4,0,5,20150901|5,1,1.0
p1,lo4,8,0,4,0,5,20150901|5,1,1.0

spark-shell:

spark-shell \
--num-executors 2 \
--driver-memory 1g \
--executor-memory 10g \
--executor-cores 8 \
--master yarn-client


case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char,
f4:Char, f5:Char, f6:String)
case class Myvalue(count1:Long, count2:Long, num:Double)

val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => {
val spl = line.split("\\|", -1)
val k = spl(0).split(",")
val v = spl(1).split(",")
(Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar,
k(5)(0).toChar, k(6)(0).toChar, k(7)),
 Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble)
)
}}

myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1)
}.collect().foreach(println)

(Mykey(p1,lo1,8,0,4,0,5,20150901),1)

(Mykey(p1,lo1,8,0,4,0,5,20150901),1)
(Mykey(p1,lo3,8,0,4,0,5,20150901),1)
(Mykey(p1,lo3,8,0,4,0,5,20150901),1)
(Mykey(p1,lo4,8,0,4,0,5,20150901),1)
(Mykey(p1,lo4,8,0,4,0,5,20150901),1)
(Mykey(p1,lo2,8,0,4,0,5,20150901),1)
(Mykey(p1,lo2,8,0,4,0,5,20150901),1)



You can see that each key is repeated 2 times but each key should only
appear once.

Arun

On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you give a bit more information ?
>
> Release of Spark you're using
> Minimal dataset that shows the problem
>
> Cheers
>
> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> wrote:
>
>> I tried groupByKey and noticed that it did not group all values into the
>> same group.
>>
>> In my test dataset (a Pair rdd) I have 16 records, where there are only 4
>> distinct keys, so I expected there to be 4 records in the groupByKey
>> object, but instead there were 8. Each of the 4 distinct keys appear 2
>> times.
>>
>> Is this the expected behavior? I need to be able to get ALL values
>> associated with each key grouped into a SINGLE record. Is it possible?
>>
>> Arun
>>
>> p.s. reducebykey will not be sufficient for me
>>
>
>


Re: groupByKey does not work?

2016-01-04 Thread Arun Luthra
If I simplify the key to String column with values lo1, lo2, lo3, lo4, it
works correctly.

On Mon, Jan 4, 2016 at 4:49 PM, Daniel Imberman <daniel.imber...@gmail.com>
wrote:

> Could you try simplifying the key and seeing if that makes any difference?
> Make it just a string or an int so we can count out any issues in object
> equality.
>
> On Mon, Jan 4, 2016 at 4:42 PM Arun Luthra <arun.lut...@gmail.com> wrote:
>
>> Spark 1.5.0
>>
>> data:
>>
>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>>
>> spark-shell:
>>
>> spark-shell \
>> --num-executors 2 \
>> --driver-memory 1g \
>> --executor-memory 10g \
>> --executor-cores 8 \
>> --master yarn-client
>>
>>
>> case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char,
>> f4:Char, f5:Char, f6:String)
>> case class Myvalue(count1:Long, count2:Long, num:Double)
>>
>> val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => {
>> val spl = line.split("\\|", -1)
>> val k = spl(0).split(",")
>> val v = spl(1).split(",")
>> (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar,
>> k(5)(0).toChar, k(6)(0).toChar, k(7)),
>>  Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble)
>> )
>> }}
>>
>> myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1)
>> }.collect().foreach(println)
>>
>> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
>>
>> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
>>
>>
>>
>> You can see that each key is repeated 2 times but each key should only
>> appear once.
>>
>> Arun
>>
>> On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Can you give a bit more information ?
>>>
>>> Release of Spark you're using
>>> Minimal dataset that shows the problem
>>>
>>> Cheers
>>>
>>> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com>
>>> wrote:
>>>
>>>> I tried groupByKey and noticed that it did not group all values into
>>>> the same group.
>>>>
>>>> In my test dataset (a Pair rdd) I have 16 records, where there are only
>>>> 4 distinct keys, so I expected there to be 4 records in the groupByKey
>>>> object, but instead there were 8. Each of the 4 distinct keys appear 2
>>>> times.
>>>>
>>>> Is this the expected behavior? I need to be able to get ALL values
>>>> associated with each key grouped into a SINGLE record. Is it possible?
>>>>
>>>> Arun
>>>>
>>>> p.s. reducebykey will not be sufficient for me
>>>>
>>>
>>>
>>


types allowed for saveasobjectfile?

2015-08-27 Thread Arun Luthra
What types of RDD can saveAsObjectFile(path) handle? I tried a naive test
with an RDD[Array[String]], but when I tried to read back the result with
sc.objectFile(path).take(5).foreach(println), I got a non-promising output
looking like:

[Ljava.lang.String;@46123a
[Ljava.lang.String;@76123b
[Ljava.lang.String;@13144c
[Ljava.lang.String;@75146d
[Ljava.lang.String;@79118f


Arun


Re: types allowed for saveasobjectfile?

2015-08-27 Thread Arun Luthra
Ah, yes, that did the trick.

So more generally, can this handle any serializable object?

On Thu, Aug 27, 2015 at 2:11 PM, Jonathan Coveney jcove...@gmail.com
wrote:

 array[String] doesn't pretty print by default. Use .mkString(,) for
 example


 El jueves, 27 de agosto de 2015, Arun Luthra arun.lut...@gmail.com
 escribió:

 What types of RDD can saveAsObjectFile(path) handle? I tried a naive test
 with an RDD[Array[String]], but when I tried to read back the result with
 sc.objectFile(path).take(5).foreach(println), I got a non-promising output
 looking like:

 [Ljava.lang.String;@46123a
 [Ljava.lang.String;@76123b
 [Ljava.lang.String;@13144c
 [Ljava.lang.String;@75146d
 [Ljava.lang.String;@79118f


 Arun




How to ignore features in mllib

2015-07-09 Thread Arun Luthra
Is it possible to ignore features in mllib? In other words, I would like to
have some 'pass-through' data, Strings for example, attached to training
examples and test data.

A related stackoverflow question:
http://stackoverflow.com/questions/30739283/spark-mllib-how-to-ignore-features-when-training-a-classifier

Arun


How to change hive database?

2015-07-07 Thread Arun Luthra
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext

I'm getting org.apache.spark.sql.catalyst.analysis.NoSuchTableException
from:

val dataframe = hiveContext.table(other_db.mytable)

Do I have to change current database to access it? Is it possible to do
this? I'm guessing that the database.table syntax that I used in
hiveContext.table is not recognized.

I have no problems accessing tables in the database called default.

I can list tables in other_db with hiveContext.tableNames(other_db)

Using Spark 1.4.0.


Re: Spark launching without all of the requested YARN resources

2015-07-02 Thread Arun Luthra
Thanks Sandy et al, I will try that. I like that I can choose the
minRegisteredResourcesRatio.

On Wed, Jun 24, 2015 at 11:04 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Hi Arun,

 You can achieve this by
 setting spark.scheduler.maxRegisteredResourcesWaitingTime to some really
 high number and spark.scheduler.minRegisteredResourcesRatio to 1.0.

 -Sandy

 On Wed, Jun 24, 2015 at 2:21 AM, Steve Loughran ste...@hortonworks.com
 wrote:


  On 24 Jun 2015, at 05:55, canan chen ccn...@gmail.com wrote:

  Why do you want it start until all the resources are ready ? Make it
 start as early as possible should make it complete earlier and increase the
 utilization of resources

 On Tue, Jun 23, 2015 at 10:34 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark
 (via spark-submit) will begin its processing even though it apparently did
 not get all of the requested resources; it is running very slowly.

  Is there a way to force Spark/YARN to only begin when it has the full
 set of resources that I request?

  Thanks,
 Arun




  The wait until there's space launch policy is known as Gang
 Scheduling, https://issues.apache.org/jira/browse/YARN-624 covers what
 would be needed there.

  1. It's not in YARN

  2. For analytics workloads, it's not clear you benefit. You would wait
 a very long time(*) for the requirements to be satisfied. The current YARN
 scheduling and placement algorithms assume that you'd prefer timely
 container launch to extended wait for containers in the right place, and
 expects algorithms to work in a degraded form with a reduced no. of workers

  3. Where it really matters is long-lived applications where you need
 some quorum of container-hosted processes, or if performance collapses
 utterly below a threshold. Things like HBase on YARN are an example —but
 Spark streaming could be another.

  In the absence of YARN support, it can be implemented in the
 application by having theYARN-hosted application (here: Spark) get the
 containers, start up a process on each one, but not actually start
 accepting/performing work until a threshold of containers is reached/some
 timeout has occurred.

  If you wanted to do that in spark, you could raise the idea on the
 spark dev lists and see what people think.

  -Steve

  (*) i.e. forever





Spark launching without all of the requested YARN resources

2015-06-23 Thread Arun Luthra
Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark (via
spark-submit) will begin its processing even though it apparently did not
get all of the requested resources; it is running very slowly.

Is there a way to force Spark/YARN to only begin when it has the full set
of resources that I request?

Thanks,
Arun


Missing values support in Mllib yet?

2015-06-19 Thread Arun Luthra
Hi,

Is there any support for handling missing values in mllib yet, especially
for decision trees where this is a natural feature?

Arun


Re: Problem getting program to run on 15TB input

2015-06-09 Thread Arun Luthra
I found that the problem was due to garbage collection in filter(). Using
Hive to do the filter solved the problem.

A lot of other problems went away when I upgraded to Spark 1.2.0, which
compresses various task overhead data (HighlyCompressedMapStatus etc.).

It has been running very very smoothly with these two changes.

I'm fairly sure that I tried coalesce(), it resulted into tasks that were
too big, the code has evolved too much to easily double check it now.

On Sat, Jun 6, 2015 at 12:50 AM, Kapil Malik kma...@adobe.com wrote:

  Very interesting and relevant thread for production level usage of spark.



 @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?



 Thanks,



 Kapil Malik | kma...@adobe.com | 33430 / 8800836581



 *From:* Daniel Mahler [mailto:dmah...@gmail.com]
 *Sent:* 13 April 2015 15:42
 *To:* Arun Luthra
 *Cc:* Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
 *Subject:* Re: Problem getting program to run on 15TB input



 Sometimes a large number of partitions leads to memory problems.

 Something like



 val rdd1 = sc.textFile(file1).coalesce(500). ...

 val rdd2 = sc.textFile(file2).coalesce(500). ...



 may help.





 On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Everything works smoothly if I do the 99%-removal filter in Hive first.
 So, all the baggage from garbage collection was breaking it.



 Is there a way to filter() out 99% of the data without having to garbage
 collect 99% of the RDD?



 On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote:

 I tried a shorter simper version of the program, with just 1 RDD,
  essentially it is:



 sc.textFile(..., N).map().filter().map( blah = (id,
 1L)).reduceByKey().saveAsTextFile(...)



 Here is a typical GC log trace from one of the yarn container logs:



 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
 real=0.02 secs]

 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
 real=0.04 secs]

 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
 real=0.08 secs]

 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
 real=0.02 secs]

 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
 real=0.02 secs]

 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
 real=0.02 secs]



 So ~9GB is getting GC'ed every few seconds. Which seems like a lot.



 Question: The filter() is removing 99% of the data. Does this 99% of the
 data get GC'ed?



 Now, I was able to finally get to reduceByKey() by reducing the number of
 executor-cores (to 2), based on suggestions at
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.



 I ran this with more executor-memory and less executors (most important
 thing was fewer executor-cores):



 --num-executors 150 \

 --driver-memory 15g \

 --executor-memory 110g \

 --executor-cores 32 \



 But then, reduceByKey() fails with:

 java.lang.OutOfMemoryError: Java heap space









 On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The Spark UI names the line number and name of the operation (repartition
 in this case) that it is performing. Only if this information is wrong
 (just a possibility), could it have started groupByKey already.



 I will try to analyze the amount of skew in the data by using reduceByKey
 (or simply countByKey) which is relatively inexpensive. For the purposes of
 this algorithm I can simply log and remove keys with huge counts, before
 doing groupByKey.



 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).



 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I understand
 how it works. How do you know that you haven't reached the groupbykey
 phase? Are you using a profiler or do yoi base that assumption only on logs?



 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:



 A correction to my first post

Re: Efficient saveAsTextFile by key, directory for each key?

2015-04-22 Thread Arun Luthra
I ended up post-processing the result in hive with a dynamic partition
insert query to get a table partitioned by the key.

Looking further, it seems that 'dynamic partition' insert is in Spark SQL
and working well in Spark SQL versions  1.2.0:
https://issues.apache.org/jira/browse/SPARK-3007

On Tue, Apr 21, 2015 at 5:45 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Is there an efficient way to save an RDD with saveAsTextFile in such a way
 that the data gets shuffled into separated directories according to a key?
 (My end goal is to wrap the result in a multi-partitioned Hive table)

 Suppose you have:

 case class MyData(val0: Int, val1: string, directory_name: String)

 and an RDD called myrdd with type RDD[MyData]. Suppose that you already
 have an array of the distinct directory_name's, called distinct_directories.

 A very inefficient way to to this is:

 distinct_directories.foreach(
   dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name )
 .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,))
 .coalesce(5)
 .saveAsTextFile(base_dir_name/ + f$dir_name)
 )

 I tried this solution, and it does not do the multiple myrdd.filter's in
 parallel.

 I'm guessing partitionBy might be in the efficient solution if an easy
 efficient solution exists...

 Thanks,
 Arun



Efficient saveAsTextFile by key, directory for each key?

2015-04-21 Thread Arun Luthra
Is there an efficient way to save an RDD with saveAsTextFile in such a way
that the data gets shuffled into separated directories according to a key?
(My end goal is to wrap the result in a multi-partitioned Hive table)

Suppose you have:

case class MyData(val0: Int, val1: string, directory_name: String)

and an RDD called myrdd with type RDD[MyData]. Suppose that you already
have an array of the distinct directory_name's, called distinct_directories.

A very inefficient way to to this is:

distinct_directories.foreach(
  dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name )
.map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,))
.coalesce(5)
.saveAsTextFile(base_dir_name/ + f$dir_name)
)

I tried this solution, and it does not do the multiple myrdd.filter's in
parallel.

I'm guessing partitionBy might be in the efficient solution if an easy
efficient solution exists...

Thanks,
Arun


Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-12 Thread Arun Luthra
I'm using a pre-built Spark; I'm not trying to compile Spark.

The compile error appears when I try to register HighlyCompressedMapStatus
in my program:

kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])

If I don't register it, I get a runtime error saying that it needs to be
registered (the error is only when I turn on kryo).

However the code is running smoothly with kryo turned off.

On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com wrote:

 I'm not sure what you mean.   Are you asking how you can recompile all of
 spark and deploy it, instead of using one of the pre-built versions?

 https://spark.apache.org/docs/latest/building-spark.html

 Or are you seeing compile problems specifically w/
 HighlyCompressedMapStatus?   The code compiles fine, so I'm not sure what
 problem you are running into -- we'd need a lot more info to help

 On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Does anyone know how to get the HighlyCompressedMapStatus to compile?

 I will try turning off kryo in 1.2.0 and hope things don't break.  I want
 to benefit from the MapOutputTracker fix in 1.2.0.

 On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com
 wrote:

 the scala syntax for arrays is Array[T], not T[], so you want to use
 something:

 kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]])
 kryo.register(classOf[Array[Short]])

 nonetheless, the spark should take care of this itself.  I'll look into
 it later today.


 On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 I think this is a Java vs scala syntax issue. Will check.

 On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Problem is noted here:
 https://issues.apache.org/jira/browse/SPARK-5949

 I tried this as a workaround:

 import org.apache.spark.scheduler._
 import org.roaringbitmap._

 ...


 kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
 kryo.register(classOf[org.roaringbitmap.RoaringArray])
 kryo.register(classOf[org.roaringbitmap.ArrayContainer])

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])
 kryo.register(classOf[short[]])


 in build file:

 libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8


 This fails to compile:

 ...:53: identifier expected but ']' found.

 [error]
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])

 also:

 :54: identifier expected but ']' found.

 [error] kryo.register(classOf[short[]])
 also:

 :51: class HighlyCompressedMapStatus in package scheduler cannot be
 accessed in package org.apache.spark.scheduler
 [error]
 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])


 Suggestions?

 Arun








Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-12 Thread Arun Luthra
The error is in the original post.

Here's the recipe that worked for me:
kryo.register(Class.forName(org.roaringbitmap.RoaringArray$Element))
kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]])
kryo.register(classOf[Array[Short]])
kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
kryo.register(classOf[org.roaringbitmap.RoaringArray])
kryo.register(classOf[org.roaringbitmap.ArrayContainer])

kryo.register(Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus))

So your Class.forName workaround worked, thanks!

On Thu, Mar 12, 2015 at 10:56 AM, Imran Rashid iras...@cloudera.com wrote:

 Giving a bit more detail on the error would make it a lot easier for
 others to help you out. Eg., in this case, it would have helped if included
 your actual compile error.

 In any case, I'm assuming your issue is b/c that class if private to
 spark.  You can sneak around that by using
 Class.forName(stringOfClassName) instead:

 scala classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]
 console:8: error: class HighlyCompressedMapStatus in package scheduler
 cannot be accessed in package org.apache.spark.scheduler

   classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]
  ^
 scala
 Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus)
 res1: Class[_] = class
 org.apache.spark.scheduler.HighlyCompressedMapStatus



 hope this helps,
 Imran


 On Thu, Mar 12, 2015 at 12:47 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 I'm using a pre-built Spark; I'm not trying to compile Spark.

 The compile error appears when I try to register
 HighlyCompressedMapStatus in my program:

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])

 If I don't register it, I get a runtime error saying that it needs to be
 registered (the error is only when I turn on kryo).

 However the code is running smoothly with kryo turned off.

 On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com
 wrote:

 I'm not sure what you mean.   Are you asking how you can recompile all
 of spark and deploy it, instead of using one of the pre-built versions?

 https://spark.apache.org/docs/latest/building-spark.html

 Or are you seeing compile problems specifically w/
 HighlyCompressedMapStatus?   The code compiles fine, so I'm not sure what
 problem you are running into -- we'd need a lot more info to help

 On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Does anyone know how to get the HighlyCompressedMapStatus to compile?

 I will try turning off kryo in 1.2.0 and hope things don't break.  I
 want to benefit from the MapOutputTracker fix in 1.2.0.

 On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com
 wrote:

 the scala syntax for arrays is Array[T], not T[], so you want to use
 something:

 kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]])
 kryo.register(classOf[Array[Short]])

 nonetheless, the spark should take care of this itself.  I'll look
 into it later today.


 On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 I think this is a Java vs scala syntax issue. Will check.

 On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Problem is noted here:
 https://issues.apache.org/jira/browse/SPARK-5949

 I tried this as a workaround:

 import org.apache.spark.scheduler._
 import org.roaringbitmap._

 ...


 kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
 kryo.register(classOf[org.roaringbitmap.RoaringArray])
 kryo.register(classOf[org.roaringbitmap.ArrayContainer])

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])
 kryo.register(classOf[short[]])


 in build file:

 libraryDependencies += org.roaringbitmap % RoaringBitmap %
 0.4.8


 This fails to compile:

 ...:53: identifier expected but ']' found.

 [error]
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])

 also:

 :54: identifier expected but ']' found.

 [error] kryo.register(classOf[short[]])
 also:

 :51: class HighlyCompressedMapStatus in package scheduler cannot be
 accessed in package org.apache.spark.scheduler
 [error]
 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])


 Suggestions?

 Arun










Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-10 Thread Arun Luthra
Does anyone know how to get the HighlyCompressedMapStatus to compile?

I will try turning off kryo in 1.2.0 and hope things don't break.  I want
to benefit from the MapOutputTracker fix in 1.2.0.

On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote:

 the scala syntax for arrays is Array[T], not T[], so you want to use
 something:

 kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]])
 kryo.register(classOf[Array[Short]])

 nonetheless, the spark should take care of this itself.  I'll look into it
 later today.


 On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote:

 I think this is a Java vs scala syntax issue. Will check.

 On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949

 I tried this as a workaround:

 import org.apache.spark.scheduler._
 import org.roaringbitmap._

 ...


 kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
 kryo.register(classOf[org.roaringbitmap.RoaringArray])
 kryo.register(classOf[org.roaringbitmap.ArrayContainer])

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])
 kryo.register(classOf[short[]])


 in build file:

 libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8


 This fails to compile:

 ...:53: identifier expected but ']' found.

 [error]
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])

 also:

 :54: identifier expected but ']' found.

 [error] kryo.register(classOf[short[]])
 also:

 :51: class HighlyCompressedMapStatus in package scheduler cannot be
 accessed in package org.apache.spark.scheduler
 [error]
 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])


 Suggestions?

 Arun






Re: Problem getting program to run on 15TB input

2015-03-02 Thread Arun Luthra
Everything works smoothly if I do the 99%-removal filter in Hive first. So,
all the baggage from garbage collection was breaking it.

Is there a way to filter() out 99% of the data without having to garbage
collect 99% of the RDD?

On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote:

 I tried a shorter simper version of the program, with just 1 RDD,
  essentially it is:

 sc.textFile(..., N).map().filter().map( blah = (id,
 1L)).reduceByKey().saveAsTextFile(...)

 Here is a typical GC log trace from one of the yarn container logs:

 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
 real=0.02 secs]
 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
 real=0.04 secs]
 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
 real=0.08 secs]
 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
 real=0.02 secs]
 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
 real=0.02 secs]
 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
 real=0.02 secs]

 So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

 Question: The filter() is removing 99% of the data. Does this 99% of the
 data get GC'ed?

 Now, I was able to finally get to reduceByKey() by reducing the number of
 executor-cores (to 2), based on suggestions at
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

 I ran this with more executor-memory and less executors (most important
 thing was fewer executor-cores):

 --num-executors 150 \
 --driver-memory 15g \
 --executor-memory 110g \
 --executor-cores 32 \

 But then, reduceByKey() fails with:

 java.lang.OutOfMemoryError: Java heap space




 On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The Spark UI names the line number and name of the operation (repartition
 in this case) that it is performing. Only if this information is wrong
 (just a possibility), could it have started groupByKey already.

 I will try to analyze the amount of skew in the data by using reduceByKey
 (or simply countByKey) which is relatively inexpensive. For the purposes of
 this algorithm I can simply log and remove keys with huge counts, before
 doing groupByKey.

 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after
 doing groupbykey one of the groups does not fit

Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-02 Thread Arun Luthra
I think this is a Java vs scala syntax issue. Will check.

On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949

 I tried this as a workaround:

 import org.apache.spark.scheduler._
 import org.roaringbitmap._

 ...


 kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
 kryo.register(classOf[org.roaringbitmap.RoaringArray])
 kryo.register(classOf[org.roaringbitmap.ArrayContainer])

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])
 kryo.register(classOf[short[]])


 in build file:

 libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8


 This fails to compile:

 ...:53: identifier expected but ']' found.

 [error]
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])

 also:

 :54: identifier expected but ']' found.

 [error] kryo.register(classOf[short[]])
 also:

 :51: class HighlyCompressedMapStatus in package scheduler cannot be
 accessed in package org.apache.spark.scheduler
 [error]
 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])


 Suggestions?

 Arun



Re: Problem getting program to run on 15TB input

2015-03-01 Thread Arun Luthra
I tried a shorter simper version of the program, with just 1 RDD,
 essentially it is:

sc.textFile(..., N).map().filter().map( blah = (id,
1L)).reduceByKey().saveAsTextFile(...)

Here is a typical GC log trace from one of the yarn container logs:

54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
real=0.02 secs]
77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
real=0.04 secs]
79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
real=0.08 secs]
92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
real=0.02 secs]
114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
real=0.02 secs]
117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
real=0.02 secs]

So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

Question: The filter() is removing 99% of the data. Does this 99% of the
data get GC'ed?

Now, I was able to finally get to reduceByKey() by reducing the number of
executor-cores (to 2), based on suggestions at
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
. This makes everything before reduceByKey() run pretty smoothly.

I ran this with more executor-memory and less executors (most important
thing was fewer executor-cores):

--num-executors 150 \
--driver-memory 15g \
--executor-memory 110g \
--executor-cores 32 \

But then, reduceByKey() fails with:

java.lang.OutOfMemoryError: Java heap space




On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com wrote:

 The Spark UI names the line number and name of the operation (repartition
 in this case) that it is performing. Only if this information is wrong
 (just a possibility), could it have started groupByKey already.

 I will try to analyze the amount of skew in the data by using reduceByKey
 (or simply countByKey) which is relatively inexpensive. For the purposes of
 this algorithm I can simply log and remove keys with huge counts, before
 doing groupByKey.

 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after
 doing groupbykey one of the groups does not fit in one of the executors'
 memory.

 To back up my theory, instead of doing groupbykey + map try
 reducebykey + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 So, actually I am removing the persist for now, because

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
A correction to my first post:

There is also a repartition right before groupByKey to help avoid
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey +
 mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = sc.textFile(file1).persist(StorageLevel
 .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything
 before groupByKey(). It fails before getting to groupByKey. I have

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
So, actually I am removing the persist for now, because there is
significant filtering that happens after calling textFile()... but I will
keep that option in mind.

I just tried a few different combinations of number of executors, executor
memory, and more importantly, number of tasks... *all three times it failed
when approximately 75.1% of the tasks were completed (no matter how many
tasks resulted from repartitioning the data in textfile(..., N))*. Surely
this is a strong clue to something?



On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
 many small objects that lead to very long GC time, causing the executor
 losts, heartbeat not received, and GC overhead limit exceeded messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
 try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for MapOutputTracker
 errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to error:
 Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything before
 groupByKey(). It fails before getting to groupByKey. I have tried doubling
 and tripling the number of partitions when calling textFile, with no
 success.

 Very similar code (trivial changes, to accomodate different input) worked
 on a smaller input (~8TB)... Not that it was easy to get that working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: Exception while reading SendingConnection
 ... java.nio.channels.ClosedChannelException
 (^ guessing that is symptom of something else)

 WARN BlockManagerMasterActor: Removing BlockManager
 BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
 (^ guessing that is symptom of something else)

 ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
 down ActorSystem [sparkDriver]
 *java.lang.OutOfMemoryError: GC overhead limit exceeded*



 Other times I will get messages about executor lost... about 1 message
 per second, after ~~50k tasks complete, until there are almost no executors
 left and progress slows to nothing.

 I ran with verbose GC info; I do see failing yarn containers that have
 multiple (like 30) Full GC messages but I don't know how to interpret if
 that is the problem. Typical Full GC time taken seems ok: [Times:
 user=23.30 sys=0.06, real=1.94 secs]



 Suggestions, please?

 Huge thanks for useful suggestions,
 Arun





Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
akka.pattern.AskTimeoutException: Timed out

and

15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

and some of these are followed by:

15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
disassociated! Shutting down.
15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
stage 1.0 (TID 336601)
java.io.FileNotFoundException:
/hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
(No such file or directory)




On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey +
 mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything
 before groupByKey(). It fails before getting to groupByKey. I have tried
 doubling and tripling the number of partitions when calling textFile, with
 no success.

 Very similar code (trivial changes, to accomodate different input)
 worked on a smaller input (~8TB)... Not that it was easy to get that
 working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The Spark UI names the line number and name of the operation (repartition
in this case) that it is performing. Only if this information is wrong
(just a possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey
(or simply countByKey) which is relatively inexpensive. For the purposes of
this algorithm I can simply log and remove keys with huge counts, before
doing groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try
 reducebykey + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com
 wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
  wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true

Problem getting program to run on 15TB input

2015-02-27 Thread Arun Luthra
My program in pseudocode looks like this:

val conf = new SparkConf().setAppName(Test)
  .set(spark.storage.memoryFraction,0.2) // default 0.6
  .set(spark.shuffle.memoryFraction,0.12) // default 0.2
  .set(spark.shuffle.manager,SORT) // preferred setting for
optimized joins
  .set(spark.shuffle.consolidateFiles,true) // helpful for too
many files open
  .set(spark.mesos.coarse, true) // helpful for MapOutputTracker
errors?
  .set(spark.akka.frameSize,500) // helpful when using
consildateFiles=true
  .set(spark.akka.askTimeout, 30)
  .set(spark.shuffle.compress,false) //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
  .set(spark.file.transferTo,false) //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
  .set(spark.core.connection.ack.wait.timeout,600) //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
  .set(spark.speculation,true)
  .set(spark.worker.timeout,600) //
http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
  .set(spark.akka.timeout,300) //
http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
  .set(spark.storage.blockManagerSlaveTimeoutMs,12)
  .set(spark.driver.maxResultSize,2048) // in response to error:
Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
  .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
  .set(spark.kryo.registrationRequired, true)

val rdd1 = 
sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
-1)...filter(...)

val rdd2 =
sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
-1)...filter(...)

rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


I run the code with:
  --num-executors 500 \
  --driver-memory 20g \
  --executor-memory 20g \
  --executor-cores 32 \


I'm using kryo serialization on everything, including broadcast variables.

Spark creates 145k tasks, and the first stage includes everything before
groupByKey(). It fails before getting to groupByKey. I have tried doubling
and tripling the number of partitions when calling textFile, with no
success.

Very similar code (trivial changes, to accomodate different input) worked
on a smaller input (~8TB)... Not that it was easy to get that working.



Errors vary, here is what I am getting right now:

ERROR SendingConnection: Exception while reading SendingConnection
... java.nio.channels.ClosedChannelException
(^ guessing that is symptom of something else)

WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
(^ guessing that is symptom of something else)

ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting down
ActorSystem [sparkDriver]
*java.lang.OutOfMemoryError: GC overhead limit exceeded*



Other times I will get messages about executor lost... about 1 message
per second, after ~~50k tasks complete, until there are almost no executors
left and progress slows to nothing.

I ran with verbose GC info; I do see failing yarn containers that have
multiple (like 30) Full GC messages but I don't know how to interpret if
that is the problem. Typical Full GC time taken seems ok: [Times:
user=23.30 sys=0.06, real=1.94 secs]



Suggestions, please?

Huge thanks for useful suggestions,
Arun


Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-02-26 Thread Arun Luthra
Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949

I tried this as a workaround:

import org.apache.spark.scheduler._
import org.roaringbitmap._

...


kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
kryo.register(classOf[org.roaringbitmap.RoaringArray])
kryo.register(classOf[org.roaringbitmap.ArrayContainer])

kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])
kryo.register(classOf[org.roaringbitmap.RoaringArray$Element])
kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])
kryo.register(classOf[short[]])


in build file:

libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8


This fails to compile:

...:53: identifier expected but ']' found.

[error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])

also:

:54: identifier expected but ']' found.

[error] kryo.register(classOf[short[]])
also:

:51: class HighlyCompressedMapStatus in package scheduler cannot be
accessed in package org.apache.spark.scheduler
[error]
kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])


Suggestions?

Arun


Open file limit settings for Spark on Yarn job

2015-02-10 Thread Arun Luthra
Hi,

I'm running Spark on Yarn from an edge node, and the tasks on the run Data
Nodes. My job fails with the Too many open files error once it gets to
groupByKey(). Alternatively I can make it fail immediately if I repartition
the data when I create the RDD.

Where do I need to make sure that ulimit -n is high enough?

On the edge node it is small, 1024, but on the data nodes, the yarn user
has a high limit, 32k. But is the yarn user the relevant user? And, is the
1024 limit for myself on the edge node a problem or is that limit not
relevant?

Arun


Re: Spark job ends abruptly during setup without error message

2015-02-05 Thread Arun Luthra
I'm submitting this on a cluster, with my usual setting of, export
YARN_CONF_DIR=/etc/hadoop/conf

It is working again after a small change to the code so I will see if I can
reproduce the error (later today).

On Thu, Feb 5, 2015 at 9:17 AM, Arush Kharbanda ar...@sigmoidanalytics.com
wrote:

 Are you submitting the job from your local machine or on the driver
 machine.?

 Have you set YARN_CONF_DIR.

 On Thu, Feb 5, 2015 at 10:43 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 While a spark-submit job is setting up, the yarnAppState goes into
 Running mode, then I get a flurry of typical looking INFO-level messages
 such as

 INFO BlockManagerMasterActor: ...
 INFO YarnClientSchedulerBackend: Registered executor:  ...

 Then, spark-submit quits without any error message and I'm back at the
 command line. What could be causing this?

 Arun




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com



Spark job ends abruptly during setup without error message

2015-02-05 Thread Arun Luthra
While a spark-submit job is setting up, the yarnAppState goes into Running
mode, then I get a flurry of typical looking INFO-level messages such as

INFO BlockManagerMasterActor: ...
INFO YarnClientSchedulerBackend: Registered executor:  ...

Then, spark-submit quits without any error message and I'm back at the
command line. What could be causing this?

Arun


Re: SQL query in scala API

2014-12-06 Thread Arun Luthra
Thanks, I will try this.

On Fri, Dec 5, 2014 at 1:19 AM, Cheng Lian lian.cs@gmail.com wrote:

  Oh, sorry. So neither SQL nor Spark SQL is preferred. Then you may write
 you own aggregation with aggregateByKey:

 users.aggregateByKey((0, Set.empty[String]))({ case ((count, seen), user) =
   (count + 1, seen + user)
 }, { case ((count0, seen0), (count1, seen1)) =
   (count0 + count1, seen0 ++ seen1)
 }).mapValues { case (count, seen) =
   (count, seen.size)
 }

 On 12/5/14 3:47 AM, Arun Luthra wrote:

   Is that Spark SQL? I'm wondering if it's possible without spark SQL.

 On Wed, Dec 3, 2014 at 8:08 PM, Cheng Lian lian.cs@gmail.com wrote:

  You may do this:

 table(users).groupBy('zip)('zip, count('user), countDistinct('user))

  On 12/4/14 8:47 AM, Arun Luthra wrote:

  I'm wondering how to do this kind of SQL query with PairRDDFunctions.

  SELECT zip, COUNT(user), COUNT(DISTINCT user)
 FROM users
 GROUP BY zip

  In the Spark scala API, I can make an RDD (called users) of key-value
 pairs where the keys are zip (as in ZIP code) and the values are user id's.
 Then I can compute the count and distinct count like this:

  val count = users.mapValues(_ = 1).reduceByKey(_ + _)
 val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _)

  Then, if I want count and countDistinct in the same table, I have to
 join them on the key.

  Is there a way to do this without doing a join (and without using SQL
 or spark SQL)?

  Arun

  ​


​



Re: SQL query in scala API

2014-12-04 Thread Arun Luthra
Is that Spark SQL? I'm wondering if it's possible without spark SQL.

On Wed, Dec 3, 2014 at 8:08 PM, Cheng Lian lian.cs@gmail.com wrote:

  You may do this:

 table(users).groupBy('zip)('zip, count('user), countDistinct('user))

 On 12/4/14 8:47 AM, Arun Luthra wrote:

   I'm wondering how to do this kind of SQL query with PairRDDFunctions.

  SELECT zip, COUNT(user), COUNT(DISTINCT user)
 FROM users
 GROUP BY zip

  In the Spark scala API, I can make an RDD (called users) of key-value
 pairs where the keys are zip (as in ZIP code) and the values are user id's.
 Then I can compute the count and distinct count like this:

  val count = users.mapValues(_ = 1).reduceByKey(_ + _)
 val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _)

  Then, if I want count and countDistinct in the same table, I have to
 join them on the key.

  Is there a way to do this without doing a join (and without using SQL or
 spark SQL)?

  Arun

   ​



SQL query in scala API

2014-12-03 Thread Arun Luthra
I'm wondering how to do this kind of SQL query with PairRDDFunctions.

SELECT zip, COUNT(user), COUNT(DISTINCT user)
FROM users
GROUP BY zip

In the Spark scala API, I can make an RDD (called users) of key-value
pairs where the keys are zip (as in ZIP code) and the values are user id's.
Then I can compute the count and distinct count like this:

val count = users.mapValues(_ = 1).reduceByKey(_ + _)
val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _)

Then, if I want count and countDistinct in the same table, I have to join
them on the key.

Is there a way to do this without doing a join (and without using SQL or
spark SQL)?

Arun


Re: rack-topology.sh no such file or directory

2014-11-25 Thread Arun Luthra
Problem was solved by having the admins put this file on the edge nodes.

Thanks,
Arun

On Wed, Nov 19, 2014 at 12:27 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Your Hadoop configuration is set to look for this file to determine racks.
 Is the file present on cluster nodes? If not, look at your hdfs-site.xml
 and remove the setting for a rack topology script there (or it might be in
 core-site.xml).

 Matei

 On Nov 19, 2014, at 12:13 PM, Arun Luthra arun.lut...@gmail.com wrote:

 I'm trying to run Spark on Yarn on a hortonworks 2.1.5 cluster. I'm
 getting this error:

 14/11/19 13:46:34 INFO cluster.YarnClientSchedulerBackend: Registered
 executor: Actor[akka.tcp://sparkExecutor@#/user/Executor#-
 2027837001] with ID 42
 14/11/19 13:46:34 WARN net.ScriptBasedMapping: Exception running
 /etc/hadoop/conf/rack-topology.sh 10.0.28.130
 java.io.IOException: Cannot run program
 /etc/hadoop/conf/rack-topology.sh (in directory ###): error=2,
 No such file or directory

 The rack-topology script is not on system (find / 2/dev/null -name
 rack-topology).

 Any possibly solution?

 Arun Luthra





rack-topology.sh no such file or directory

2014-11-19 Thread Arun Luthra
I'm trying to run Spark on Yarn on a hortonworks 2.1.5 cluster. I'm getting
this error:

14/11/19 13:46:34 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@#/user/Executor#-
2027837001] with ID 42
14/11/19 13:46:34 WARN net.ScriptBasedMapping: Exception running
/etc/hadoop/conf/rack-topology.sh 10.0.28.130
java.io.IOException: Cannot run program /etc/hadoop/conf/rack-topology.sh
(in directory ###): error=2, No such file or directory

The rack-topology script is not on system (find / 2/dev/null -name
rack-topology).

Any possibly solution?

Arun Luthra


How to configure build.sbt for Spark 1.2.0

2014-10-08 Thread Arun Luthra
I built Spark 1.2.0 succesfully, but was unable to build my Spark program
under 1.2.0 with sbt assembly  my build.sbt file. It contains:

I tried:
org.apache.spark %% spark-sql % 1.2.0,
org.apache.spark %% spark-core % 1.2.0,

and

org.apache.spark %% spark-sql % 1.2.0-SNAPSHOT,
org.apache.spark %% spark-core % 1.2.0-SNAPSHOT,

but I get errors like:
[warn] ::
[warn] ::  UNRESOLVED DEPENDENCIES ::
[warn] ::
[warn] :: org.apache.spark#spark-sql_2.10;1.2.0: not found
[warn] :: org.apache.spark#spark-core_2.10;1.2.0: not found
[warn] ::

sbt.ResolveException: unresolved dependency:
org.apache.spark#spark-sql_2.10;1.2.0: not found
unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found
...
[error] (*:update) sbt.ResolveException: unresolved dependency:
org.apache.spark#spark-sql_2.10;1.2.0: not found
[error] unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not
found



Do I need to configure my build.sbt to point to my local spark 1.2.0
repository? How?

Thanks,
- Arun


Re: How to configure build.sbt for Spark 1.2.0

2014-10-08 Thread Arun Luthra
Hi Pat,

Couple of points:

1) I must have done something naive like:
git clone git://github.com/apache/spark.git -b branch-1.2.0

because git branch is telling me I'm on the master branch, and I see
that branch-1.2.0 doesn't exist (https://github.com/apache/spark).
Nevertheless, when I compiled this master branch spark shell tells me I
have 1.2.0. So I guess the master is currently 1.2.0...

2) The README on the master branch only has build instructions for maven. I
built Spark successfully with
mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package

and it looks like the publish local solution for maven is:
mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean install

I will report back with the result.

On Wed, Oct 8, 2014 at 5:50 PM, Pat McDonough pat.mcdono...@databricks.com
wrote:

 Hey Arun,

 Since this build depends on unpublished builds of spark (1.2.0-SNAPSHOT),
 you'll need to first build spark and publish-local so your application
 build can find those SNAPSHOTs in your local repo.

 Just append publish-local to your sbt command where you build Spark.

 -Pat



 On Wed, Oct 8, 2014 at 5:35 PM, Arun Luthra arun.lut...@gmail.com wrote:

 I built Spark 1.2.0 succesfully, but was unable to build my Spark program
 under 1.2.0 with sbt assembly  my build.sbt file. It contains:

 I tried:
 org.apache.spark %% spark-sql % 1.2.0,
 org.apache.spark %% spark-core % 1.2.0,

 and

 org.apache.spark %% spark-sql % 1.2.0-SNAPSHOT,
 org.apache.spark %% spark-core % 1.2.0-SNAPSHOT,

 but I get errors like:
 [warn] ::
 [warn] ::  UNRESOLVED DEPENDENCIES ::
 [warn] ::
 [warn] :: org.apache.spark#spark-sql_2.10;1.2.0: not found
 [warn] :: org.apache.spark#spark-core_2.10;1.2.0: not found
 [warn] ::

 sbt.ResolveException: unresolved dependency:
 org.apache.spark#spark-sql_2.10;1.2.0: not found
 unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found
 ...
 [error] (*:update) sbt.ResolveException: unresolved dependency:
 org.apache.spark#spark-sql_2.10;1.2.0: not found
 [error] unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0:
 not found



 Do I need to configure my build.sbt to point to my local spark 1.2.0
 repository? How?

 Thanks,
 - Arun





Re: PrintWriter error in foreach

2014-09-10 Thread Arun Luthra
Ok, so I don't think the workers on the data nodes will be able to see my
output directory on the edge node. I don't think stdout will work either,
so I'll write to HDFS via rdd.saveAsTextFile(...)

On Wed, Sep 10, 2014 at 3:51 PM, Daniil Osipov daniil.osi...@shazam.com
wrote:

 Try providing full path to the file you want to write, and make sure the
 directory exists and is writable by the Spark process.

 On Wed, Sep 10, 2014 at 3:46 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 I have a spark program that worked in local mode, but throws an error in
 yarn-client mode on a cluster. On the edge node in my home directory, I
 have an output directory (called transout) which is ready to receive files.
 The spark job I'm running is supposed to write a few hundred files into
 that directory, once for each iteration of a foreach function. This works
 in local mode, and my only guess as to why this would fail in yarn-client
 mode is that the RDD is distributed across many nodes and the program is
 trying to use the PrintWriter on the datanodes, where the output directory
 doesn't exist. Is this what's happening? Any proposed solution?

 abbreviation of the code:

 import java.io.PrintWriter
 ...
 rdd.foreach {
   val outFile = new PrintWriter(transoutput/output.%s.format(id))
   outFile.println(test)
   outFile.close()
 }

 Error:

 14/09/10 16:57:09 WARN TaskSetManager: Lost TID 1826 (task 0.0:26)
 14/09/10 16:57:09 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException
 java.io.FileNotFoundException: transoutput/input.598718 (No such file or
 directory)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:194)
 at java.io.FileOutputStream.init(FileOutputStream.java:84)
 at java.io.PrintWriter.init(PrintWriter.java:146)
 at
 com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:98)
 at
 com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:95)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)