[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-06 Thread Hans van den Bogert (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944925#comment-14944925
 ] 

Hans van den Bogert edited comment on SPARK-10474 at 10/6/15 1:46 PM:
--

One more debug println for the calculated cores (in contrast to numCores):
https://gist.github.com/hansbogert/cc2baf3995d4e37270a2

Relevant output (output is the same for fine-grained as well as coarse-grained 
mesos):
{noformat}
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/06 10:25:04 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
numCores:0
cores:12
1048576
15/10/06 10:25:05 WARN MetricsSystem: Using default name DAGScheduler for 
source because spark.app.id is not set.
...
{noformat}

The calculated 'cores' is 12, which the amount of cores of the local driver 
node, however the total mesos cluster has more than 40 cores. Either way, there 
is no difference between fine-grained and coarse grained mode, for this method 
at least.

/update 
I should've read the logs on the mesos slaves as well, indeed a discrepancy 
between fine-grained mode and coarse grained mode:
In fine-grained mode:
{noformat}
head 
/local/vdbogert/var/lib/mesos/slaves/20151006-105432-84120842-5050-17066-S3/frameworks/20151006-105432-84120842-5050-17066-0009/executors/20151006-105432-84120842-5050-17066-S3/runs/latest/stdout
2numCores:1
cores:1
67108864
{noformat}


And in coarse grained:
{noformat}
head 
/local/vdbogert/var/lib/mesos/slaves/20151006-105432-84120842-5050-17066-S3/frameworks/20151006-105432-84120842-5050-17066-0010/executors/3/runs/latest/stdout
Registered executor on node326.ib.cluster
Starting task 3
sh -c ' "/var/scratch/vdbogert/src/spark-1.5.1/bin/spark-class" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
akka.tcp://sparkDriver@10.141.3.254:56069/user/CoarseGrainedScheduler 
--executor-id 20151006-105432-84120842-5050-17066-S3 --hostname 
node326.ib.cluster --cores 8 --app-id 20151006-105432-84120842-5050-17066-0010'
Forked command at 4378
numCores:8
cores:8
16777216
{noformat}

This is probably a different bug specific to Mesos fine-grained mode. My 
current workaround is setting the `spark.buffer.pageSize` to the value of 16M 
which otherwise would also have been used automatically in the coarse-grained 
mode.

/update2
Even allocating only 16MB just like in coarse-grained mode (and going even 
lower to 8MB), and I'm *still* seeing this popping up. So 


was (Author: hbogert):
One more debug println for the calculated cores (in contrast to numCores):
https://gist.github.com/hansbogert/cc2baf3995d4e37270a2

Relevant output (output is the same for fine-grained as well as coarse-grained 
mesos):
{noformat}
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/06 10:25:04 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
numCores:0
cores:12
1048576
15/10/06 10:25:05 WARN MetricsSystem: Using default name DAGScheduler for 
source because spark.app.id is not set.
...
{noformat}

The calculated 'cores' is 12, which the amount of cores of the local driver 
node, however the total mesos cluster has more than 40 cores. Either way, there 
is no difference between fine-grained and coarse grained mode, for this method 
at least.

/update 
I should've read the logs on the mesos slaves as well, indeed a discrepancy 
between fine-grained mode and coarse grained mode:
In fine-grained mode:
{noformat}
head 
/local/vdbogert/var/lib/mesos/slaves/20151006-105432-84120842-5050-17066-S3/frameworks/20151006-105432-84120842-5050-17066-0009/executors/20151006-105432-84120842-5050-17066-S3/runs/latest/stdout
2numCores:1
cores:1
67108864
{noformat}


And in coarse grained:
{noformat}
head 
/local/vdbogert/var/lib/mesos/slaves/20151006-105432-84120842-5050-17066-S3/frameworks/20151006-105432-84120842-5050-17066-0010/executors/3/runs/latest/stdout
Registered executor on node326.ib.cluster
Starting task 3
sh -c ' "/var/scratch/vdbogert/src/spark-1.5.1/bin/spark-class" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
akka.tcp://sparkDriver@10.141.3.254:56069/user/CoarseGrainedScheduler 
--executor-id 20151006-1054

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-06 Thread Hans van den Bogert (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944925#comment-14944925
 ] 

Hans van den Bogert edited comment on SPARK-10474 at 10/6/15 12:13 PM:
---

One more debug println for the calculated cores (in contrast to numCores):
https://gist.github.com/hansbogert/cc2baf3995d4e37270a2

Relevant output (output is the same for fine-grained as well as coarse-grained 
mesos):
{noformat}
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/06 10:25:04 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
numCores:0
cores:12
1048576
15/10/06 10:25:05 WARN MetricsSystem: Using default name DAGScheduler for 
source because spark.app.id is not set.
...
{noformat}

The calculated 'cores' is 12, which the amount of cores of the local driver 
node, however the total mesos cluster has more than 40 cores. Either way, there 
is no difference between fine-grained and coarse grained mode, for this method 
at least.

/update 
I should've read the logs on the mesos slaves as well, indeed a discrepancy 
between fine-grained mode and coarse grained mode:
In fine-grained mode:
{noformat}
head 
/local/vdbogert/var/lib/mesos/slaves/20151006-105432-84120842-5050-17066-S3/frameworks/20151006-105432-84120842-5050-17066-0009/executors/20151006-105432-84120842-5050-17066-S3/runs/latest/stdout
2numCores:1
cores:1
67108864
{noformat}


And in coarse grained:
{noformat}
head 
/local/vdbogert/var/lib/mesos/slaves/20151006-105432-84120842-5050-17066-S3/frameworks/20151006-105432-84120842-5050-17066-0010/executors/3/runs/latest/stdout
Registered executor on node326.ib.cluster
Starting task 3
sh -c ' "/var/scratch/vdbogert/src/spark-1.5.1/bin/spark-class" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
akka.tcp://sparkDriver@10.141.3.254:56069/user/CoarseGrainedScheduler 
--executor-id 20151006-105432-84120842-5050-17066-S3 --hostname 
node326.ib.cluster --cores 8 --app-id 20151006-105432-84120842-5050-17066-0010'
Forked command at 4378
numCores:8
cores:8
16777216
{noformat}

This is probably a different bug specific to Mesos fine-grained mode. My 
current workaround is setting the `spark.buffer.pageSize` to the value of 16M 
which otherwise would also have been used automatically in the coarse-grained 
mode.


was (Author: hbogert):
One more debug println for the calculated cores (in contrast to numCores):
https://gist.github.com/hansbogert/cc2baf3995d4e37270a2

Relevant output (output is the same for fine-grained as well as coarse-grained 
mesos):
{noformat}
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/06 10:25:04 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
numCores:0
cores:12
1048576
15/10/06 10:25:05 WARN MetricsSystem: Using default name DAGScheduler for 
source because spark.app.id is not set.
...
{noformat}

The calculated 'cores' is 12, which the amount of cores of the local driver 
node, however the total mesos cluster has more than 40 cores. Either way, there 
is no difference between fine-grained and coarse grained mode, for this method 
at least.

/update 
I should've read the logs on the mesos slaves as well, indeed a discrepancy 
between fine-grained mode and coarse grained mode:
In fine-grained mode:
{noformat}
head 
/local/vdbogert/var/lib/mesos/slaves/20151006-105432-84120842-5050-17066-S3/frameworks/20151006-105432-84120842-5050-17066-0009/executors/20151006-105432-84120842-5050-17066-S3/runs/latest/stdout
2numCores:1
cores:1
67108864
{noformat}


And in coarse grained:
{noformat}
head 
/local/vdbogert/var/lib/mesos/slaves/20151006-105432-84120842-5050-17066-S3/frameworks/20151006-105432-84120842-5050-17066-0010/executors/3/runs/latest/stdout
Registered executor on node326.ib.cluster
Starting task 3
sh -c ' "/var/scratch/vdbogert/src/spark-1.5.1/bin/spark-class" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
akka.tcp://sparkDriver@10.141.3.254:56069/user/CoarseGrainedScheduler 
--executor-id 20151006-105432-84120842-5050-17066-S3 --hostname 
node326.ib.cluster --cores 8 --app-id 20151006-105432-84120842-5050-17066-0010'
Forked command at 4378
nu

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-05 Thread Hans van den Bogert (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944166#comment-14944166
 ] 

Hans van den Bogert edited comment on SPARK-10474 at 10/5/15 10:39 PM:
---

Had this patch against tag 1.5.1:
https://gist.github.com/f110f64887f4739b7dd8

Output of the added println()'s are near the beginning in:

{noformat}
Using Spark's repl log4j profile: 
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/06 00:16:00 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
numCores:0
1048576
15/10/06 00:16:02 WARN MetricsSystem: Using default name DAGScheduler for 
source because spark.app.id is not set.
I1006 00:16:02.414851 25123 sched.cpp:137] Version: 0.21.0
I1006 00:16:02.423246 25115 sched.cpp:234] New master detected at 
master@10.149.3.5:5050
I1006 00:16:02.423482 25115 sched.cpp:242] No credentials provided. Attempting 
to register without authentication
I1006 00:16:02.427250 25121 sched.cpp:408] Framework registered with 
20151006-001500-84120842-5050-6847-
15/10/06 00:16:02 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Spark context available as sc.
15/10/06 00:16:04 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:04 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:09 WARN ObjectStore: Version information not found in metastore. 
hive.metastore.schema.verification is not enabled so recording the schema 
version 1.2.0
15/10/06 00:16:09 WARN ObjectStore: Failed to get database default, returning 
NoSuchObjectException
15/10/06 00:16:11 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:11 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:14 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
SQL context available as sqlContext.
{noformat}


was (Author: hbogert):
Had this patch against tag 1.5.1:
https://gist.github.com/f110f64887f4739b7dd8.git

Output of the added println()'s are near the beginning in:

{noformat}
Using Spark's repl log4j profile: 
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/06 00:16:00 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
numCores:0
1048576
15/10/06 00:16:02 WARN MetricsSystem: Using default name DAGScheduler for 
source because spark.app.id is not set.
I1006 00:16:02.414851 25123 sched.cpp:137] Version: 0.21.0
I1006 00:16:02.423246 25115 sched.cpp:234] New master detected at 
master@10.149.3.5:5050
I1006 00:16:02.423482 25115 sched.cpp:242] No credentials provided. Attempting 
to register without authentication
I1006 00:16:02.427250 25121 sched.cpp:408] Framework registered with 
20151006-001500-84120842-5050-6847-
15/10/06 00:16:02 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Spark context available as sc.
15/10/06 00:16:04 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:04 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:09 WARN ObjectStore: Version information not found in metastore. 
hive.metastore.schema.verification is not enabled so recording the schema 
version 1.2.0
15/10/06 00:16:09 WARN ObjectStore: Failed to get database default, returning 
NoSuchObjectException
15/10/06 00:16:11 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:11 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:14 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using bu

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-05 Thread Hans van den Bogert (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944166#comment-14944166
 ] 

Hans van den Bogert edited comment on SPARK-10474 at 10/5/15 10:36 PM:
---

Had this patch against tag 1.5.1:
https://gist.github.com/f110f64887f4739b7dd8.git

Output of the added println()'s are near the beginning in:

{noformat}
Using Spark's repl log4j profile: 
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/06 00:16:00 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
numCores:0
1048576
15/10/06 00:16:02 WARN MetricsSystem: Using default name DAGScheduler for 
source because spark.app.id is not set.
I1006 00:16:02.414851 25123 sched.cpp:137] Version: 0.21.0
I1006 00:16:02.423246 25115 sched.cpp:234] New master detected at 
master@10.149.3.5:5050
I1006 00:16:02.423482 25115 sched.cpp:242] No credentials provided. Attempting 
to register without authentication
I1006 00:16:02.427250 25121 sched.cpp:408] Framework registered with 
20151006-001500-84120842-5050-6847-
15/10/06 00:16:02 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Spark context available as sc.
15/10/06 00:16:04 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:04 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:09 WARN ObjectStore: Version information not found in metastore. 
hive.metastore.schema.verification is not enabled so recording the schema 
version 1.2.0
15/10/06 00:16:09 WARN ObjectStore: Failed to get database default, returning 
NoSuchObjectException
15/10/06 00:16:11 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:11 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:14 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
SQL context available as sqlContext.
{noformat}


was (Author: hbogert):
Had this patch against tag 1.5.1:
https://gist.github.com/f110f64887f4739b7dd8.git

Output of the added println()'s are near the beginning in:

Using Spark's repl log4j profile: 
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/06 00:16:00 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
numCores:0
1048576
15/10/06 00:16:02 WARN MetricsSystem: Using default name DAGScheduler for 
source because spark.app.id is not set.
I1006 00:16:02.414851 25123 sched.cpp:137] Version: 0.21.0
I1006 00:16:02.423246 25115 sched.cpp:234] New master detected at 
master@10.149.3.5:5050
I1006 00:16:02.423482 25115 sched.cpp:242] No credentials provided. Attempting 
to register without authentication
I1006 00:16:02.427250 25121 sched.cpp:408] Framework registered with 
20151006-001500-84120842-5050-6847-
15/10/06 00:16:02 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Spark context available as sc.
15/10/06 00:16:04 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:04 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:09 WARN ObjectStore: Version information not found in metastore. 
hive.metastore.schema.verification is not enabled so recording the schema 
version 1.2.0
15/10/06 00:16:09 WARN ObjectStore: Failed to get database default, returning 
NoSuchObjectException
15/10/06 00:16:11 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:11 WARN Connection: BoneCP specified but not present in 
CLASSPATH (or one of dependencies)
15/10/06 00:16:14 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using built

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-05 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943868#comment-14943868
 ] 

Reynold Xin edited comment on SPARK-10474 at 10/5/15 7:21 PM:
--

Also please attach the "explain" of the query plan. Thanks.

Also explain how you set spark.buffer.pageSize.


was (Author: rxin):
Also please attach the "explain" of the query plan. Thanks.


> TungstenAggregation cannot acquire memory for pointer array after switching 
> to sort-based
> -
>
> Key: SPARK-10474
> URL: https://issues.apache.org/jira/browse/SPARK-10474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Yi Zhou
>Assignee: Andrew Or
>Priority: Blocker
> Fix For: 1.5.1, 1.6.0
>
>
> In aggregation case, a  Lost task happened with below error.
> {code}
>  java.io.IOException: Could not acquire 65536 bytes of memory
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
> at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126)
> at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Key SQL Query
> {code:sql}
> INSERT INTO TABLE test_table
> SELECT
>   ss.ss_customer_sk AS cid,
>   count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
>   count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,
>   count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,
>   count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7,
>   count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,
>   count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,
>   count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13,
>   count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15,
>   count(CASE WHEN i.i_class_id=2  THEN 1 ELSE NULL END) AS id2,
>   count(CASE WHEN i.i_class_id=4  THEN 1 ELSE NULL END) AS id4,
>   count(CASE WHEN i.i_class_id=6  THEN 1 ELSE NULL END) AS id6,
>   count(CASE WHEN i.i_class_id=8  THEN 1 ELSE NULL END) AS id8,
>   count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10,
>   count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14,
>   count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16
> FROM store_sales ss
> INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk
> WHERE i.i_category IN ('Books')
> AND ss.ss_customer_sk IS NOT NULL
> GROUP BY ss.ss_customer_sk
> HAVING count(ss.ss_item_sk) > 5
> {code}
> Note:
> the store

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-05 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14939808#comment-14939808
 ] 

Reynold Xin edited comment on SPARK-10474 at 10/5/15 7:05 PM:
--

We are executing 20 Spark SQL jobs in parallel using Spark Job Server and 
hitting this issue pretty routinely.

40GB heaps x 6 nodes. Have tried adjusting shuffle.memoryFraction from 0.2 -> 
0.1 with no difference. I am using the HEAD of the Spark 1.5 branch so it 
definitely includes all the commits above.

{code}
.16): org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to acquire 16777216 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68)
at 
org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at 
scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.TraversableLike$class.collect(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.collect(Traversable.scala:105)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

{code}


was (Author: nadenf):
We are executing 20 Spark SQL jobs in parallel using Spark Job Server and 
hitting this issue pretty routinely.

40GB heaps x 6 nodes. Have tried adjusting shuffle.memoryFraction from 0.2 -> 
0.1 with no difference. I am using the HEAD of the Spark 1.5 branch so it 
definitely includes all the commits above.

.16): org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to acquire 16777216 bytes of memory
at 
org.apache.spark.util.colle

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-01 Thread Naden Franciscus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14940487#comment-14940487
 ] 

Naden Franciscus edited comment on SPARK-10474 at 10/1/15 9:58 PM:
---

I can't provide the explain plan since we are executing 1000s of SQL statement 
and hard to tell which is which.

Have increased heap to 50GB + shuffle.memoryFraction to 0.6 and 0.8. No change.

Will file this in another ticket.


was (Author: nadenf):
I can't provide the explain plan since we are executing 1000s of SQL statement 
and hard to tell which is which.

Have increased heap to 50GB + shuffle.memoryFraction to 0.6 and 0.8. No change.

@Andrew: is there is a ticket for this ?

> TungstenAggregation cannot acquire memory for pointer array after switching 
> to sort-based
> -
>
> Key: SPARK-10474
> URL: https://issues.apache.org/jira/browse/SPARK-10474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Yi Zhou
>Assignee: Andrew Or
>Priority: Blocker
> Fix For: 1.5.1, 1.6.0
>
>
> In aggregation case, a  Lost task happened with below error.
> {code}
>  java.io.IOException: Could not acquire 65536 bytes of memory
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
> at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126)
> at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Key SQL Query
> {code:sql}
> INSERT INTO TABLE test_table
> SELECT
>   ss.ss_customer_sk AS cid,
>   count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
>   count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,
>   count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,
>   count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7,
>   count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,
>   count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,
>   count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13,
>   count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15,
>   count(CASE WHEN i.i_class_id=2  THEN 1 ELSE NULL END) AS id2,
>   count(CASE WHEN i.i_class_id=4  THEN 1 ELSE NULL END) AS id4,
>   count(CASE WHEN i.i_class_id=6  THEN 1 ELSE NULL END) AS id6,
>   count(CASE WHEN i.i_class_id=8  THEN 1 ELSE NULL END) AS id8,
>   count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10,
>   count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-01 Thread Naden Franciscus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14939808#comment-14939808
 ] 

Naden Franciscus edited comment on SPARK-10474 at 10/1/15 1:33 PM:
---

We are executing 20 Spark SQL jobs in parallel using Spark Job Server and 
hitting this issue pretty routinely.

40GB heaps x 6 nodes. Have tried adjusting shuffle.memoryFraction from 0.2 -> 
0.1 with no difference. I am using the HEAD of the Spark 1.5 branch so it 
definitely includes all the commits above.

.16): org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to acquire 16777216 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68)
at 
org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at 
scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.TraversableLike$class.collect(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.collect(Traversable.scala:105)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)



was (Author: nadenf):
We are executing 20 Spark SQL jobs in parallel using Spark Job Server and 
hitting this issue pretty routinely.

40GB heaps x 6 nodes. Have tried adjusting shuffle.memoryFraction from 0.2 -> 
0.1 with no difference. I am using the HEAD of the Spark 1.5 branch so it 
definitely includes all the commits above.

Caused by: java.io.IOException: Unable to acquire 16777216 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68)
at 
org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartition

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-10-01 Thread Naden Franciscus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14939808#comment-14939808
 ] 

Naden Franciscus edited comment on SPARK-10474 at 10/1/15 1:30 PM:
---

We are executing 20 Spark SQL jobs in parallel using Spark Job Server and 
hitting this issue pretty routinely.

40GB heaps x 6 nodes. Have tried adjusting shuffle.memoryFraction from 0.2 -> 
0.1 with no difference. I am using the HEAD of the Spark 1.5 branch so it 
definitely includes all the commits above.

Caused by: java.io.IOException: Unable to acquire 16777216 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68)
at 
org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at 
scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.TraversableLike$class.collect(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.collect(Traversable.scala:105)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82)



was (Author: nadenf):
We are executing 20 Spark SQL jobs in parallel using Spark Job Server and 
hitting this issue pretty routinely.

40GB heaps x 6 nodes. Have tried adjusting shuffle.memoryFraction from 0.2 -> 
0.1 with no difference.

Caused by: java.io.IOException: Unable to acquire 16777216 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68)
at 
org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at 
scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.TraversableLike$class.collect(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.collect(Traversable.scala:105)
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82)


> TungstenAggregation cannot acquire memory for pointer array after switching 
> to sort-based
> -
>
> Key: SPARK-10474
> URL: https://issues.apache.org/jira/browse/SPARK-10474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Yi Zhou
>Assignee: Andrew Or
>Priority: Blocker
> Fix For: 1.5.1, 1.6.0
>
>
> In aggregation case, a  Lost task happened with below error.
> {code}
>  jav

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-09-28 Thread Hans van den Bogert (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14933966#comment-14933966
 ] 

Hans van den Bogert edited comment on SPARK-10474 at 9/28/15 8:56 PM:
--

Checked using CHANGES.txt file:

{noformat}
Spark Change Log


Release 1.5.1

  ...

  [SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array 
(round 2)
  Andrew Or 
  2015-09-23 19:34:31 -0700
  Commit: 1f47e68, github.com/apache/spark/pull/

{noformat}


was (Author: hbogert):
Checked using CHANGES.txt file:

{noformat}
Spark Change Log


Release 1.5.1

  [SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming UI 
to clear failed batches
  zsxwing , Tathagata Das 
  2015-09-23 19:52:02 -0700
  Commit: 4c48593, github.com/apache/spark/pull/8892

  Update branch-1.5 for 1.5.1 release.
  Reynold Xin 
  2015-09-23 19:46:13 -0700
  Commit: 1000b5d, github.com/apache/spark/pull/8890

  [SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array 
(round 2)
  Andrew Or 
  2015-09-23 19:34:31 -0700
  Commit: 1f47e68, github.com/apache/spark/pull/

{noformat}

> TungstenAggregation cannot acquire memory for pointer array after switching 
> to sort-based
> -
>
> Key: SPARK-10474
> URL: https://issues.apache.org/jira/browse/SPARK-10474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Yi Zhou
>Assignee: Andrew Or
>Priority: Blocker
> Fix For: 1.5.1, 1.6.0
>
>
> In aggregation case, a  Lost task happened with below error.
> {code}
>  java.io.IOException: Could not acquire 65536 bytes of memory
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
> at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126)
> at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Key SQL Query
> {code:sql}
> INSERT INTO TABLE test_table
> SELECT
>   ss.ss_customer_sk AS cid,
>   count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
>   count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,
>   count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,
>   count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7,
>   count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,
>   count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,
>   count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NU

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-09-28 Thread Hans van den Bogert (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14933966#comment-14933966
 ] 

Hans van den Bogert edited comment on SPARK-10474 at 9/28/15 8:55 PM:
--

Checked using CHANGES.txt file:

{noformat}
Spark Change Log


Release 1.5.1

  [SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming UI 
to clear failed batches
  zsxwing , Tathagata Das 
  2015-09-23 19:52:02 -0700
  Commit: 4c48593, github.com/apache/spark/pull/8892

  Update branch-1.5 for 1.5.1 release.
  Reynold Xin 
  2015-09-23 19:46:13 -0700
  Commit: 1000b5d, github.com/apache/spark/pull/8890

  [SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array 
(round 2)
  Andrew Or 
  2015-09-23 19:34:31 -0700
  Commit: 1f47e68, github.com/apache/spark/pull/

{noformat}


was (Author: hbogert):
Checked using CHANGES file:

{noformat}
Spark Change Log


Release 1.5.1

  [SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming UI 
to clear failed batches
  zsxwing , Tathagata Das 
  2015-09-23 19:52:02 -0700
  Commit: 4c48593, github.com/apache/spark/pull/8892

  Update branch-1.5 for 1.5.1 release.
  Reynold Xin 
  2015-09-23 19:46:13 -0700
  Commit: 1000b5d, github.com/apache/spark/pull/8890

  [SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array 
(round 2)
  Andrew Or 
  2015-09-23 19:34:31 -0700
  Commit: 1f47e68, github.com/apache/spark/pull/

{noformat}

> TungstenAggregation cannot acquire memory for pointer array after switching 
> to sort-based
> -
>
> Key: SPARK-10474
> URL: https://issues.apache.org/jira/browse/SPARK-10474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Yi Zhou
>Assignee: Andrew Or
>Priority: Blocker
> Fix For: 1.5.1, 1.6.0
>
>
> In aggregation case, a  Lost task happened with below error.
> {code}
>  java.io.IOException: Could not acquire 65536 bytes of memory
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
> at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126)
> at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Key SQL Query
> {code:sql}
> INSERT INTO TABLE test_table
> SELECT
>   ss.ss_customer_sk AS cid,
>   count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
>   count(CASE WHEN i.i_class_id=3  THEN 1

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-09-22 Thread Jiri Syrovy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14902223#comment-14902223
 ] 

Jiri Syrovy edited comment on SPARK-10474 at 9/22/15 8:30 AM:
--

Seems to be the cause for me at least with the default pageSize. I was doing 
some other tests and left the settings there. Although the data set itself is 
very very small (~10k rows).
{code}
sparkConf.setMaster("local[64]");
{code}


was (Author: xjrk):
Seems to be the cause for me at least with the default pageSize. I was doing 
some other tests and left the settings there. Although the data set itself is 
very very small (~10k rows).

sparkConf.setMaster("local[64]");


> TungstenAggregation cannot acquire memory for pointer array after switching 
> to sort-based
> -
>
> Key: SPARK-10474
> URL: https://issues.apache.org/jira/browse/SPARK-10474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Yi Zhou
>Assignee: Andrew Or
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> In aggregation case, a  Lost task happened with below error.
> {code}
>  java.io.IOException: Could not acquire 65536 bytes of memory
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
> at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126)
> at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Key SQL Query
> {code:sql}
> INSERT INTO TABLE test_table
> SELECT
>   ss.ss_customer_sk AS cid,
>   count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
>   count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,
>   count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,
>   count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7,
>   count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,
>   count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,
>   count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13,
>   count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15,
>   count(CASE WHEN i.i_class_id=2  THEN 1 ELSE NULL END) AS id2,
>   count(CASE WHEN i.i_class_id=4  THEN 1 ELSE NULL END) AS id4,
>   count(CASE WHEN i.i_class_id=6  THEN 1 ELSE NULL END) AS id6,
>   count(CASE WHEN i.i_class_id=8  THEN 1 ELSE NULL END) AS id8,
>   count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10,
>   count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14,
>   count(CASE

[jira] [Comment Edited] (SPARK-10474) TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

2015-09-21 Thread Andrew Or (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901440#comment-14901440
 ] 

Andrew Or edited comment on SPARK-10474 at 9/21/15 9:23 PM:


[~jameszhouyi] [~jrk] did you set `spark.task.cpus` by any chance?


was (Author: andrewor14):
[~Yi Zhou] [~jrk] did you set `spark.task.cpus` by any chance?

> TungstenAggregation cannot acquire memory for pointer array after switching 
> to sort-based
> -
>
> Key: SPARK-10474
> URL: https://issues.apache.org/jira/browse/SPARK-10474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Yi Zhou
>Assignee: Andrew Or
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> In aggregation case, a  Lost task happened with below error.
> {code}
>  java.io.IOException: Could not acquire 65536 bytes of memory
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
> at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126)
> at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> at 
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Key SQL Query
> {code:sql}
> INSERT INTO TABLE test_table
> SELECT
>   ss.ss_customer_sk AS cid,
>   count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
>   count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,
>   count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,
>   count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7,
>   count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,
>   count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,
>   count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13,
>   count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15,
>   count(CASE WHEN i.i_class_id=2  THEN 1 ELSE NULL END) AS id2,
>   count(CASE WHEN i.i_class_id=4  THEN 1 ELSE NULL END) AS id4,
>   count(CASE WHEN i.i_class_id=6  THEN 1 ELSE NULL END) AS id6,
>   count(CASE WHEN i.i_class_id=8  THEN 1 ELSE NULL END) AS id8,
>   count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10,
>   count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14,
>   count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16
> FROM store_sales ss
> INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk
> WHERE i.i_category IN ('Books')
> AND ss.ss_customer_sk IS NOT NULL
> GROUP BY ss.ss_customer_sk
> HAVING count(ss.ss_item_sk) > 5
> {code}
> Note:
> the store_sales is a big fact table and item is