[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14905279#comment-14905279
 ] 

Andrew Or commented on SPARK-10474:
-----------------------------------

Re-opening this because I found the real cause for this issue (which turns out 
to be the same as SPARK-10733).

In TungstenAggregate's prepare method, we only reserve a page. However, when we 
switch to sort-based aggregation, we try to acquire a page AND a pointer array. 
This means even if we spill the page we currently have, there's a reasonable 
chance that we still can't allocate for the pointer array.

The temporary solution for 1.5.1 would be to simply not track the pointer array 
(we already don't track it in some other places), which should be safe because 
of the generous shuffle safety fraction of 20%.

I'll submit another patch shortly.

> TungstenAggregation cannot acquire memory for pointer array after switching 
> to sort-based
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-10474
>                 URL: https://issues.apache.org/jira/browse/SPARK-10474
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.0
>            Reporter: Yi Zhou
>            Assignee: Andrew Or
>            Priority: Blocker
>             Fix For: 1.6.0, 1.5.1
>
>
> In aggregation case, a  Lost task happened with below error.
> {code}
>  java.io.IOException: Could not acquire 65536 bytes of memory
>         at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
>         at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
>         at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
>         at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>         at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> Key SQL Query
> {code:sql}
> INSERT INTO TABLE test_table
> SELECT
>   ss.ss_customer_sk AS cid,
>   count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
>   count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,
>   count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,
>   count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7,
>   count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,
>   count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,
>   count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13,
>   count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15,
>   count(CASE WHEN i.i_class_id=2  THEN 1 ELSE NULL END) AS id2,
>   count(CASE WHEN i.i_class_id=4  THEN 1 ELSE NULL END) AS id4,
>   count(CASE WHEN i.i_class_id=6  THEN 1 ELSE NULL END) AS id6,
>   count(CASE WHEN i.i_class_id=8  THEN 1 ELSE NULL END) AS id8,
>   count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10,
>   count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14,
>   count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16
> FROM store_sales ss
> INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk
> WHERE i.i_category IN ('Books')
> AND ss.ss_customer_sk IS NOT NULL
> GROUP BY ss.ss_customer_sk
> HAVING count(ss.ss_item_sk) > 5
> {code}
> Note:
> the store_sales is a big fact table and item is a small dimension table.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to