Re: java.lang.OutOfMemoryError: Unable to acquire bytes of memory

2016-04-14 Thread Nezih Yigitbasi
Thanks Imran. I will give it a shot when I have some time.

Nezih

On Thu, Apr 14, 2016 at 9:25 AM Imran Rashid <iras...@cloudera.com> wrote:

> Hi Nezih,
>
> I just reported a somewhat similar issue, and I have a potential fix --
> SPARK-14560, looks like you are already watching it :).  You can try out
> that patch, you have to explicitly enable the change in behavior with
> "spark.shuffle.spillAfterRead=true".  Honestly, I don't think these issues
> are the same, as I've always seen that case lead to acquiring 0 bytes,
> while in your case you are requesting GBs and getting something pretty
> close, so my hunch is that it is different ... but might be worth a shot to
> see if it is the issue.
>
> Turning on debug logging for TaskMemoryManager might help track the root
> cause -- you'll get information on which consumers are using memory and
> when there are spill attempts.  (Note that even if the patch I have for
> SPARK-14560 doesn't fix your issue, it might still make those debug logs a
> bit more clear, since it'll report memory used by Spillables.)
>
> Imran
>
> On Mon, Apr 4, 2016 at 10:52 PM, Nezih Yigitbasi <
> nyigitb...@netflix.com.invalid> wrote:
>
>> Nope, I didn't have a chance to track the root cause, and IIRC we didn't
>> observe it when dyn. alloc. is off.
>>
>> On Mon, Apr 4, 2016 at 6:16 PM Reynold Xin <r...@databricks.com> wrote:
>>
>>> BTW do you still see this when dynamic allocation is off?
>>>
>>> On Mon, Apr 4, 2016 at 6:16 PM, Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> Nezih,
>>>>
>>>> Have you had a chance to figure out why this is happening?
>>>>
>>>>
>>>> On Tue, Mar 22, 2016 at 1:32 AM, james <yiaz...@gmail.com> wrote:
>>>>
>>>>> I guess different workload cause diff result ?
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-OutOfMemoryError-Unable-to-acquire-bytes-of-memory-tp16773p16789.html
>>>>> Sent from the Apache Spark Developers List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>


Re: java.lang.OutOfMemoryError: Unable to acquire bytes of memory

2016-04-04 Thread Nezih Yigitbasi
Nope, I didn't have a chance to track the root cause, and IIRC we didn't
observe it when dyn. alloc. is off.

On Mon, Apr 4, 2016 at 6:16 PM Reynold Xin  wrote:

> BTW do you still see this when dynamic allocation is off?
>
> On Mon, Apr 4, 2016 at 6:16 PM, Reynold Xin  wrote:
>
>> Nezih,
>>
>> Have you had a chance to figure out why this is happening?
>>
>>
>> On Tue, Mar 22, 2016 at 1:32 AM, james  wrote:
>>
>>> I guess different workload cause diff result ?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-OutOfMemoryError-Unable-to-acquire-bytes-of-memory-tp16773p16789.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>>
>>
>


Re: how about a custom coalesce() policy?

2016-04-03 Thread Nezih Yigitbasi
Sure, here <https://issues.apache.org/jira/browse/SPARK-14042> is the jira
and this <https://github.com/apache/spark/pull/11865> is the PR.

Nezih

On Sat, Apr 2, 2016 at 10:40 PM Hemant Bhanawat <hemant9...@gmail.com>
wrote:

> correcting email id for Nezih
>
> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
> www.snappydata.io
>
> On Sun, Apr 3, 2016 at 11:09 AM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Hi Nezih,
>>
>> Can you share JIRA and PR numbers?
>>
>> This partial de-coupling of data partitioning strategy and spark
>> parallelism would be a useful feature for any data store.
>>
>> Hemant
>>
>> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
>> www.snappydata.io
>>
>> On Fri, Apr 1, 2016 at 10:33 PM, Nezih Yigitbasi <
>> nyigitb...@netflix.com.invalid> wrote:
>>
>>> Hey Reynold,
>>> Created an issue (and a PR) for this change to get discussions started.
>>>
>>> Thanks,
>>> Nezih
>>>
>>> On Fri, Feb 26, 2016 at 12:03 AM Reynold Xin <r...@databricks.com>
>>> wrote:
>>>
>>>> Using the right email for Nezih
>>>>
>>>>
>>>> On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <r...@databricks.com>
>>>> wrote:
>>>>
>>>>> I think this can be useful.
>>>>>
>>>>> The only thing is that we are slowly migrating to the
>>>>> Dataset/DataFrame API, and leave RDD mostly as is as a lower level API.
>>>>> Maybe we should do both? In either case it would be great to discuss the
>>>>> API on a pull request. Cheers.
>>>>>
>>>>> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
>>>>> nyigitb...@netflix.com.invalid> wrote:
>>>>>
>>>>>> Hi Spark devs,
>>>>>>
>>>>>> I have sent an email about my problem some time ago where I want to
>>>>>> merge a large number of small files with Spark. Currently I am using Hive
>>>>>> with the CombineHiveInputFormat and I can control the size of the
>>>>>> output files with the max split size parameter (which is used for
>>>>>> coalescing the input splits by the CombineHiveInputFormat). My first
>>>>>> attempt was to use coalesce(), but since coalesce only considers the
>>>>>> target number of partitions the output file sizes were varying wildly.
>>>>>>
>>>>>> What I think can be useful is to have an optional PartitionCoalescer
>>>>>> parameter (a new interface) in the coalesce() method (or maybe we
>>>>>> can add a new method ?) that the callers can implement for custom
>>>>>> coalescing strategies — for my use case I have already implemented a
>>>>>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>>>>>> their sizes and by using a max split size parameter, similar to the
>>>>>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access
>>>>>> to the individual split sizes etc.).
>>>>>>
>>>>>> What do you guys think about such a change, can it be useful to other
>>>>>> users as well? Or do you think that there is an easier way to accomplish
>>>>>> the same merge logic? If you think it may be useful, I already have
>>>>>> an implementation and I will be happy to work with the community to
>>>>>> contribute it.
>>>>>>
>>>>>> Thanks,
>>>>>> Nezih
>>>>>> ​
>>>>>>
>>>>>
>>>>>
>>>>
>>
>


Re: how about a custom coalesce() policy?

2016-04-01 Thread Nezih Yigitbasi
Hey Reynold,
Created an issue (and a PR) for this change to get discussions started.

Thanks,
Nezih

On Fri, Feb 26, 2016 at 12:03 AM Reynold Xin <r...@databricks.com> wrote:

> Using the right email for Nezih
>
>
> On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <r...@databricks.com> wrote:
>
>> I think this can be useful.
>>
>> The only thing is that we are slowly migrating to the Dataset/DataFrame
>> API, and leave RDD mostly as is as a lower level API. Maybe we should do
>> both? In either case it would be great to discuss the API on a pull
>> request. Cheers.
>>
>> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
>> nyigitb...@netflix.com.invalid> wrote:
>>
>>> Hi Spark devs,
>>>
>>> I have sent an email about my problem some time ago where I want to
>>> merge a large number of small files with Spark. Currently I am using Hive
>>> with the CombineHiveInputFormat and I can control the size of the
>>> output files with the max split size parameter (which is used for
>>> coalescing the input splits by the CombineHiveInputFormat). My first
>>> attempt was to use coalesce(), but since coalesce only considers the
>>> target number of partitions the output file sizes were varying wildly.
>>>
>>> What I think can be useful is to have an optional PartitionCoalescer
>>> parameter (a new interface) in the coalesce() method (or maybe we can
>>> add a new method ?) that the callers can implement for custom coalescing
>>> strategies — for my use case I have already implemented a
>>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>>> their sizes and by using a max split size parameter, similar to the
>>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access to
>>> the individual split sizes etc.).
>>>
>>> What do you guys think about such a change, can it be useful to other
>>> users as well? Or do you think that there is an easier way to accomplish
>>> the same merge logic? If you think it may be useful, I already have an
>>> implementation and I will be happy to work with the community to contribute
>>> it.
>>>
>>> Thanks,
>>> Nezih
>>> ​
>>>
>>
>>
>


Re: java.lang.OutOfMemoryError: Unable to acquire bytes of memory

2016-03-22 Thread Nezih Yigitbasi
Interesting. After experimenting with various parameters increasing
spark.sql.shuffle.partitions and decreasing spark.buffer.pageSize helped my
job go through. BTW I will be happy to help getting this issue fixed.

Nezih

On Tue, Mar 22, 2016 at 1:07 AM james  wrote:

Hi,
> I also found 'Unable to acquire memory' issue using Spark 1.6.1 with
> Dynamic
> allocation on YARN. My case happened with setting
> spark.sql.shuffle.partitions larger than 200. From error stack, it has a
> diff with issue reported by Nezih and not sure if these has same root
> cause.
>
> Thanks
> James
>
> 16/03/17 16:02:11 INFO spark.MapOutputTrackerMaster: Size of output
> statuses
> for shuffle 0 is 1912805 bytes
> 16/03/17 16:02:12 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
> map output locations for shuffle 1 to hw-node3:55062
> 16/03/17 16:02:12 INFO spark.MapOutputTrackerMaster: Size of output
> statuses
> for shuffle 0 is 1912805 bytes
> 16/03/17 16:02:16 INFO scheduler.TaskSetManager: Starting task 280.0 in
> stage 153.0 (TID 9390, hw-node5, partition 280,PROCESS_LOCAL, 2432 bytes)
> 16/03/17 16:02:16 WARN scheduler.TaskSetManager: Lost task 170.0 in stage
> 153.0 (TID 9280, hw-node5): java.lang.OutOfMemoryError: Unable to acquire
> 1073741824 bytes of memory, got 1060110796
> at
>
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
> at
>
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:295)
> at
>
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:330)
> at
>
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)
> at
>
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168)
> at
> org.apache.spark.sql.execution.Sort$anonfun$1.apply(Sort.scala:90)
> at
> org.apache.spark.sql.execution.Sort$anonfun$1.apply(Sort.scala:64)
> at
>
> org.apache.spark.rdd.RDD$anonfun$mapPartitionsInternal$1$anonfun$apply$21.apply(RDD.scala:728)
> at
>
> org.apache.spark.rdd.RDD$anonfun$mapPartitionsInternal$1$anonfun$apply$21.apply(RDD.scala:728)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
>
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-OutOfMemoryError-Unable-to-acquire-bytes-of-memory-tp16773p16787.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
> ​


Re: java.lang.OutOfMemoryError: Unable to acquire bytes of memory

2016-03-21 Thread Nezih Yigitbasi
Andrew, thanks for the suggestion, but unfortunately it didn't work --
still getting the same exception.
On Mon, Mar 21, 2016 at 10:32 AM Andrew Or <and...@databricks.com> wrote:

> @Nezih, can you try again after setting `spark.memory.useLegacyMode` to
> true? Can you still reproduce the OOM that way?
>
> 2016-03-21 10:29 GMT-07:00 Nezih Yigitbasi <nyigitb...@netflix.com.invalid
> >:
>
>> Hi Spark devs,
>> I am using 1.6.0 with dynamic allocation on yarn. I am trying to run a
>> relatively big application with 10s of jobs and 100K+ tasks and my app
>> fails with the exception below. The closest jira issue I could find is
>> SPARK-11293 <https://issues.apache.org/jira/browse/SPARK-11293>, which
>> is a critical bug that has been open for a long time. There are other
>> similar jira issues (all fixed): SPARK-10474
>> <https://issues.apache.org/jira/browse/SPARK-10474>, SPARK-10733
>> <https://issues.apache.org/jira/browse/SPARK-10733>, SPARK-10309
>> <https://issues.apache.org/jira/browse/SPARK-10309>, SPARK-10379
>> <https://issues.apache.org/jira/browse/SPARK-10379>.
>>
>> Any workarounds to this issue or any plans to fix it?
>>
>> Thanks a lot,
>> Nezih
>>
>> 16/03/19 05:12:09 INFO memory.TaskMemoryManager: Memory used in task 
>> 4687016/03/19 05:12:09 INFO memory.TaskMemoryManager: Acquired by 
>> org.apache.spark.shuffle.sort.ShuffleExternalSorter@1c36f801: 32.0 
>> KB16/03/19 05:12:09 INFO memory.TaskMemoryManager: 1512915599 bytes of 
>> memory were used by task 46870 but are not associated with specific 
>> consumers16/03/19 05:12:09 INFO memory.TaskMemoryManager: 1512948367 bytes 
>> of memory are used for execution and 156978343 bytes of memory are used for 
>> storage16/03/19 05:12:09 ERROR executor.Executor: Managed memory leak 
>> detected; size = 1512915599 bytes, TID = 4687016/03/19 05:12:09 ERROR 
>> executor.Executor: Exception in task 77.0 in stage 273.0 (TID 46870)
>> java.lang.OutOfMemoryError: Unable to acquire 128 bytes of memory, got 0
>> at 
>> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>> at 
>> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:354)
>> at 
>> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:375)
>> at 
>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
>> at 
>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)16/03/19 05:12:09 ERROR 
>> util.SparkUncaughtExceptionHandler: Uncaught exception in thread 
>> Thread[Executor task launch worker-8,5,main]
>> java.lang.OutOfMemoryError: Unable to acquire 128 bytes of memory, got 0
>> at 
>> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>> at 
>> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:354)
>> at 
>> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:375)
>> at 
>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
>> at 
>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)16/03/19 05:12:10 INFO 
>> storage.DiskBlockManager: Shutdown hook called16/03/19 05:12:10 INFO 
>> util.ShutdownHookManager: Shutdown hook called
>>
>> ​
>>
>
>


java.lang.OutOfMemoryError: Unable to acquire bytes of memory

2016-03-21 Thread Nezih Yigitbasi
Hi Spark devs,
I am using 1.6.0 with dynamic allocation on yarn. I am trying to run a
relatively big application with 10s of jobs and 100K+ tasks and my app
fails with the exception below. The closest jira issue I could find is
SPARK-11293 , which is a
critical bug that has been open for a long time. There are other similar
jira issues (all fixed): SPARK-10474
, SPARK-10733
, SPARK-10309
, SPARK-10379
.

Any workarounds to this issue or any plans to fix it?

Thanks a lot,
Nezih

16/03/19 05:12:09 INFO memory.TaskMemoryManager: Memory used in task
4687016/03/19 05:12:09 INFO memory.TaskMemoryManager: Acquired by
org.apache.spark.shuffle.sort.ShuffleExternalSorter@1c36f801: 32.0
KB16/03/19 05:12:09 INFO memory.TaskMemoryManager: 1512915599 bytes of
memory were used by task 46870 but are not associated with specific
consumers16/03/19 05:12:09 INFO memory.TaskMemoryManager: 1512948367
bytes of memory are used for execution and 156978343 bytes of memory
are used for storage16/03/19 05:12:09 ERROR executor.Executor: Managed
memory leak detected; size = 1512915599 bytes, TID = 4687016/03/19
05:12:09 ERROR executor.Executor: Exception in task 77.0 in stage
273.0 (TID 46870)
java.lang.OutOfMemoryError: Unable to acquire 128 bytes of memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
at 
org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:354)
at 
org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:375)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)16/03/19 05:12:09 ERROR
util.SparkUncaughtExceptionHandler: Uncaught exception in thread
Thread[Executor task launch worker-8,5,main]
java.lang.OutOfMemoryError: Unable to acquire 128 bytes of memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
at 
org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:354)
at 
org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:375)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)16/03/19 05:12:10 INFO
storage.DiskBlockManager: Shutdown hook called16/03/19 05:12:10 INFO
util.ShutdownHookManager: Shutdown hook called

​


SparkContext.stop() takes too long to complete

2016-03-19 Thread Nezih Yigitbasi
Hi Spark experts,
I am using Spark 1.5.2 on YARN with dynamic allocation enabled. I see in
the driver/application master logs that the app is marked as SUCCEEDED and
then SparkContext stop is called. However, this stop sequence takes > 10
minutes to complete, and YARN resource manager kills the application master
as it didn’t receive a heartbeat within the last 10 minutes. The resource
manager then kills the application master. Any ideas about what may be
going on?

Here are the relevant logs:

*6/03/18 21:26:58 INFO yarn.ApplicationMaster: Final app status:
SUCCEEDED, exitCode: 0
16/03/18 21:26:58 INFO spark.SparkContext: Invoking stop() from
shutdown hook*16/03/18 21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/api,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs,null}16/03/18 21:26:58 INFO
ui.SparkUI: Stopped Spark web UI at
http://10.143.240.240:5270616/03/18 21:27:58 INFO
cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s)
113516/03/18 21:27:58 INFO yarn.YarnAllocator: Driver requested a
total number of 208 executor(s).16/03/18 21:27:58 INFO
yarn.ApplicationMaster$AMEndpoint: Driver requested to kill
executor(s) 1135.16/03/18 21:27:58 INFO
spark.ExecutorAllocationManager: Removing executor 1135 because it has
been idle for 60 seconds (new desired total will be 208)16/03/18
21:27:58 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill
executor(s) 112316/03/18 21:27:58 INFO yarn.YarnAllocator: Driver
requested a total number of 207 executor(s).16/03/18 

how about a custom coalesce() policy?

2016-02-24 Thread Nezih Yigitbasi
Hi Spark devs,

I have sent an email about my problem some time ago where I want to merge a
large number of small files with Spark. Currently I am using Hive with the
CombineHiveInputFormat and I can control the size of the output files with
the max split size parameter (which is used for coalescing the input splits
by the CombineHiveInputFormat). My first attempt was to use coalesce(), but
since coalesce only considers the target number of partitions the output
file sizes were varying wildly.

What I think can be useful is to have an optional PartitionCoalescer
parameter (a new interface) in the coalesce() method (or maybe we can add a
new method ?) that the callers can implement for custom coalescing
strategies — for my use case I have already implemented a
SizeBasedPartitionCoalescer that coalesces partitions by looking at their
sizes and by using a max split size parameter, similar to the
CombineHiveInputFormat (I also had to expose HadoopRDD to get access to the
individual split sizes etc.).

What do you guys think about such a change, can it be useful to other users
as well? Or do you think that there is an easier way to accomplish the same
merge logic? If you think it may be useful, I already have an
implementation and I will be happy to work with the community to contribute
it.

Thanks,
Nezih
​


Re: question about combining small parquet files

2015-11-30 Thread Nezih Yigitbasi
This looks interesting, thanks Ruslan. But, compaction with Hive is as
simple as an insert overwrite statement as Hive
supports CombineFileInputFormat, is it possible to do the same with Spark?

On Thu, Nov 26, 2015 at 9:47 AM, Ruslan Dautkhanov <dautkha...@gmail.com>
wrote:

> An interesting compaction approach of small files is discussed recently
>
> http://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
>
>
> AFAIK Spark supports views too.
>
>
> --
> Ruslan Dautkhanov
>
> On Thu, Nov 26, 2015 at 10:43 AM, Nezih Yigitbasi <
> nyigitb...@netflix.com.invalid> wrote:
>
>> Hi Spark people,
>> I have a Hive table that has a lot of small parquet files and I am
>> creating a data frame out of it to do some processing, but since I have a
>> large number of splits/files my job creates a lot of tasks, which I don't
>> want. Basically what I want is the same functionality that Hive provides,
>> that is, to combine these small input splits into larger ones by specifying
>> a max split size setting. Is this currently possible with Spark?
>>
>> I look at coalesce() but with coalesce I can only control the number
>> of output files not their sizes. And since the total input dataset size
>> can vary significantly in my case, I cannot just use a fixed partition
>> count as the size of each output file can get very large. I then looked for
>> getting the total input size from an rdd to come up with some heuristic to
>> set the partition count, but I couldn't find any ways to do it (without
>> modifying the spark source).
>>
>> Any help is appreciated.
>>
>> Thanks,
>> Nezih
>>
>> PS: this email is the same as my previous email as I learned that my
>> previous email ended up as spam for many people since I sent it through
>> nabble, sorry for the double post.
>>
>
>


question about combining small parquet files

2015-11-26 Thread Nezih Yigitbasi
Hi Spark people,
I have a Hive table that has a lot of small parquet files and I am
creating a data frame out of it to do some processing, but since I have a
large number of splits/files my job creates a lot of tasks, which I don't
want. Basically what I want is the same functionality that Hive provides,
that is, to combine these small input splits into larger ones by specifying
a max split size setting. Is this currently possible with Spark?

I look at coalesce() but with coalesce I can only control the number
of output files not their sizes. And since the total input dataset size
can vary significantly in my case, I cannot just use a fixed partition
count as the size of each output file can get very large. I then looked for
getting the total input size from an rdd to come up with some heuristic to
set the partition count, but I couldn't find any ways to do it (without
modifying the spark source).

Any help is appreciated.

Thanks,
Nezih

PS: this email is the same as my previous email as I learned that my
previous email ended up as spam for many people since I sent it through
nabble, sorry for the double post.