There is an open Jira on Spark not pushing predicates to metastore. I have a 
large dataset with many partitions but doing anything with it 8s very 
slow...But I am surprised Spark 1.2 worked for you: it has this problem...

<div>-------- Original message --------</div><div>From: Andrew Otto 
<ao...@wikimedia.org> </div><div>Date:05/22/2015  3:51 PM  (GMT-05:00) 
</div><div>To: user@spark.apache.org </div><div>Cc: Joseph Allemandou 
<jalleman...@wikimedia.org>,Madhumitha Viswanathan <mviswanat...@wikimedia.org> 
</div><div>Subject: HiveContext fails when querying large external Parquet 
tables </div><div>
</div>Hi all,

(This email was easier to write in markdown, so I’ve created a gist with its 
contents here: https://gist.github.com/ottomata/f91ea76cece97444e269.  I’ll 
paste the markdown content in the email body here too.)

---
We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0.  
Since upgrading, we can no longer query our large webrequest dataset using 
HiveContext.  HiveContext + Parquet and other file types work fine with 
external tables (We have a similarly large JSON external table that works just 
fine with HiveContext.)

Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
mainly interact with this dataset via a Hive external table, but also have been 
using Spark's HiveContext.

```
# This single hourly directory is only 5.3M
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
5.3 M  15.8 M  
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0

# This monthly directory is 1.8T.  (There are subdirectories down to hourly 
level here too.)
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
1.8 T  5.3 T  /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
```

If I create a Hive table on top of this data, and add the single hourly 
partition, querying works via both Hive and Spark HiveContext

```sql
hive (otto)> CREATE EXTERNAL TABLE IF NOT EXISTS 
`otto.webrequest_few_partitions_big_data`(
    `hostname`          string  COMMENT 'Source node hostname',
    `sequence`          bigint  COMMENT 'Per host sequence number',
    `dt`                string  COMMENT 'Timestame at cache in ISO 8601',
    `time_firstbyte`    double  COMMENT 'Time to first byte',
    `ip`                string  COMMENT 'IP of packet at cache',
    `cache_status`      string  COMMENT 'Cache status',
    `http_status`       string  COMMENT 'HTTP status of response',
    `response_size`     bigint  COMMENT 'Response size',
    `http_method`       string  COMMENT 'HTTP method of request',
    `uri_host`          string  COMMENT 'Host of request',
    `uri_path`          string  COMMENT 'Path of request',
    `uri_query`         string  COMMENT 'Query of request',
    `content_type`      string  COMMENT 'Content-Type header of response',
    `referer`           string  COMMENT 'Referer header of request',
    `x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
    `user_agent`        string  COMMENT 'User-Agent header of request',
    `accept_language`   string  COMMENT 'Accept-Language header of request',
    `x_analytics`       string  COMMENT 'X-Analytics header of response',
    `range`             string  COMMENT 'Range header of response',
    `is_pageview`       boolean COMMENT 'Indicates if this record was marked as 
a pageview during refinement',
    `record_version`    string  COMMENT 'Keeps track of changes in the table 
content definition - 
https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest',
    `client_ip`         string  COMMENT 'Client IP computed during refinement 
using ip and x_forwarded_for',
    `geocoded_data`     map<string, string>  COMMENT 'Geocoded map with 
continent, country_code, country, city, subdivision, postal_code, latitude, 
longitude, timezone keys  and associated values.',
    `x_cache`           string  COMMENT 'X-Cache header of response',
    `user_agent_map`    map<string, string>  COMMENT 'User-agent map with 
browser_name, browser_major, device, os_name, os_minor, os_major keys and 
associated values',
    `x_analytics_map`   map<string, string>  COMMENT 'X_analytics map view of 
the x_analytics field',
    `ts`                timestamp            COMMENT 'Unix timestamp in 
milliseconds extracted from dt',
    `access_method`     string  COMMENT 'Method used to accessing the site 
(mobile app|mobile web|desktop)',
    `agent_type`        string  COMMENT 'Categorise the agent making the 
webrequest as either user or spider (automatas to be added).',
    `is_zero`           boolean COMMENT 'Indicates if the webrequest is 
accessed through a zero provider',
    `referer_class`     string  COMMENT 'Indicates if a referer is internal, 
external or unknown.'
)
PARTITIONED BY (
    `webrequest_source` string  COMMENT 'Source cluster',
    `year`              int     COMMENT 'Unpadded year of request',
    `month`             int     COMMENT 'Unpadded month of request',
    `day`               int     COMMENT 'Unpadded day of request',
    `hour`              int     COMMENT 'Unpadded hour of request'
)
CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS
STORED AS PARQUET
LOCATION '/wmf/data/wmf/webrequest'
;

hive (otto)> alter table otto.webrequest_few_partitions_big_data add partition 
(webrequest_source='misc', year=2015, month=5, day=20, hour=0) location 
'/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0';

hive (otto)> SELECT uri_host, uri_path from 
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
year=2015 and month=5 and day=20 and hour=0 limit 4;
OK
uri_host        uri_path
graphite.wikimedia.org  /render
graphite.wikimedia.org  /render
stats.wikimedia.org     /EN/PlotTotalArticlesBS.png
etherpad.wikimedia.org  /socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ
```

```scala
$ spark-shell

scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)

scala> val query = "SELECT uri_host, uri_path from 
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
year=2015 and month=5 and day=20 and hour=0 limit 4"

scala> hc.sql(query).collect().foreach(println)
[graphite.wikimedia.org,/render]
[graphite.wikimedia.org,/render]
[stats.wikimedia.org,/EN/PlotTotalArticlesBS.png]
[etherpad.wikimedia.org,/socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ]

```

But, when I add more data, Spark either OOMs, or results in Premature EOF (if I 
bump up memory).  I've tried this with many  hourly partitions, or just two 
partitions one with lots of data.  In either case the same amount of data was 
present.  For this example I'll just add one large partition to the table.

```sql
-- Of course day=0, hour=25 doesn't make sense, but I'm including all
-- of May in this one paritition, so I just chose dummy partition keys
hive (otto)> alter table otto.webrequest_few_partitions_big_data add partition 
(webrequest_source='bits', year=2015, month=5, day=0, hour=25) location 
'/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5';

hive (otto)> show partitions otto.webrequest_few_partitions_big_data;
OK
partition
webrequest_source=bits/year=2015/month=5/day=0/hour=25
webrequest_source=misc/year=2015/month=5/day=20/hour=0

hive (otto)> SELECT uri_host, uri_path from 
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
year=2015 and month=5 and day=20 and hour=0 limit 4;
OK
uri_host        uri_path
graphite.wikimedia.org  /render
graphite.wikimedia.org  /render
stats.wikimedia.org     /EN/PlotTotalArticlesBS.png
etherpad.wikimedia.org  /socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ
```

Now, let's try the same query we did before in spark-shell.

```scala
$ spark-shell

Spark context available as sc.
15/05/22 19:04:25 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
hc: org.apache.spark.sql.hive.HiveContext = 
org.apache.spark.sql.hive.HiveContext@38fe4d15

scala> val query = "SELECT uri_host, uri_path from 
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
year=2015 and month=5 and day=20 and hour=0 limit 4"
query: String = SELECT uri_host, uri_path from 
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
year=2015 and month=5 and day=20 and hour=0 limit 4

scala> hc.sql(query).collect().foreach(println)
15/05/22 19:04:42 INFO metastore: Trying to connect to metastore with URI 
thrift://analytics1027.eqiad.wmnet:9083
15/05/22 19:04:42 INFO metastore: Connected to metastore.
15/05/22 19:04:43 INFO SessionState: Created local directory: 
/tmp/fcb5ac90-78c7-4368-adac-085d7935fd6b_resources
15/05/22 19:04:43 INFO SessionState: Created HDFS directory: 
/tmp/hive/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b
15/05/22 19:04:43 INFO SessionState: Created local directory: 
/tmp/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b
15/05/22 19:04:43 INFO SessionState: Created HDFS directory: 
/tmp/hive/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b/_tmp_space.db
15/05/22 19:04:43 INFO SessionState: No Tez session required at this point. 
hive.execution.engine=mr.
15/05/22 19:04:44 INFO ParseDriver: Parsing command: SELECT uri_host, uri_path 
from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
year=2015 and month=5 and day=20 and hour=0 limit 4
15/05/22 19:04:44 INFO ParseDriver: Parse Completed

### Many minutes pass... ###

Exception in thread "qtp454555988-129" Exception in thread "IPC Client 
(1610773823) connection to analytics1001.eqiad.wmnet/10.64.36.118:8020 from 
otto" Exception in thread "qtp454555988-52"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler 
in thread "sparkDriver-scheduler-1"
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
15/05/22 18:49:15 INFO RetryInvocationHandler: Exception while invoking 
getBlockLocations of class ClientNamenodeProtocolTranslatorPB over 
analytics1001.eqiad.wmnet/10.64.36.118:8020. Trying to fail over immediately.
java.io.IOException: com.google.protobuf.ServiceException: 
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at 
org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
        at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211)
        at 
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
        at 
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1191)
        at 
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:299)
        at 
org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:265)
        at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:257)
        at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1490)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
        at 
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:411)
        at 
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)
        at 
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
        at 
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
        at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.popAndExecAll(ForkJoinPool.java:1243)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1344)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC 
overhead limit exceeded
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:274)
        at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
        ... 36 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
```

Note, I have also seen slightly different OOM errors on the CLI.   Sometimes I 
see:
```
15/05/22 19:09:22 ERROR ActorSystemImpl: exception on LARS’ timer thread
java.lang.OutOfMemoryError: GC overhead limit exceeded
15/05/22 19:09:29 INFO ActorSystemImpl: starting new LARS thread
15/05/22 19:09:29 ERROR ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-scheduler-1] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "Driver Heartbeater" java.lang.OutOfMemoryError: GC 
overhead limit exceeded

...
```


Ok, so let's increase the driver memory:

```scala
$ spark-shell --driver-memory 1500M
Spark context available as sc.
15/05/22 19:09:07 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)

scala> val query = "SELECT uri_host, uri_path from 
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
year=2015 and month=5 and day=20 and hour=0 limit 4"

scala> hc.sql(query).collect().foreach(println)
15/05/22 19:11:41 INFO metastore: Trying to connect to metastore with URI 
thrift://analytics1027.eqiad.wmnet:9083
15/05/22 19:11:41 INFO metastore: Connected to metastore.
15/05/22 19:11:42 INFO SessionState: Created local directory: 
/tmp/5f8996fc-ce1a-4954-8c6c-94c291aa8c70_resources
15/05/22 19:11:42 INFO SessionState: Created HDFS directory: 
/tmp/hive/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70
15/05/22 19:11:42 INFO SessionState: Created local directory: 
/tmp/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70
15/05/22 19:11:42 INFO SessionState: Created HDFS directory: 
/tmp/hive/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70/_tmp_space.db
15/05/22 19:11:42 INFO SessionState: No Tez session required at this point. 
hive.execution.engine=mr.
15/05/22 19:11:43 INFO ParseDriver: Parsing command: SELECT uri_host, uri_path 
from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
year=2015 and month=5 and day=20 and hour=0 limit 4
15/05/22 19:11:44 INFO ParseDriver: Parse Completed

### Almost 30 minutes pass... ###

java.io.EOFException: Premature EOF: no length prefix available
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2212)
        at 
org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:408)
        at 
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:796)
        at 
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:674)
        at 
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
        at 
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:621)
        at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:847)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:700)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
        at 
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:423)
        at 
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)
        at 
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
        at 
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:172)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
        at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runSubtask(ForkJoinPool.java:1357)
        at 
scala.concurrent.forkjoin.ForkJoinPool.tryHelpStealer(ForkJoinPool.java:2253)
        at 
scala.concurrent.forkjoin.ForkJoinPool.awaitJoin(ForkJoinPool.java:2377)
        at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
        at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
        at 
scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:187)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
        at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

I've tested this both in local mode and in YARN client mode, and both have 
similar behavoirs.  What's worrysome is that the behavior is different after 
adding more data to the table, even though I am querying the same very small 
partition.  The whole point of Hive partitions is to allow jobs to work with 
only the data that is needed.  I'm not sure what Spark  HiveContext is doing 
here, but it seems to couple the full size of a Hive table to the performance 
of a query that only needs a very small amount of data.

I poked around the Spark source, and for a minute thought this might be 
related: https://github.com/apache/spark/commit/42389b17, but that was included 
in Spark 1.2.0, and this was working for us fine.

Is HiveContext somehow trying to scan the whole table in the driver?  Has 
anyone else had this problem?

Thanks!

-Andrew Otto

Reply via email to