Hi all,
(This email was easier to write in markdown, so I’ve created a gist with its
contents here: https://gist.github.com/ottomata/f91ea76cece97444e269
<https://gist.github.com/ottomata/f91ea76cece97444e269>. I’ll paste the
markdown content in the email body here too.)
---
We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive
1.1.0. Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0.
Since upgrading, we can no longer query our large webrequest dataset using
HiveContext. HiveContext + Parquet and other file types work fine with
external tables (We have a similarly large JSON external table that works just
fine with HiveContext.)
Our webrequest dataset is stored in hourly partitioned Parquet files. We
mainly interact with this dataset via a Hive external table, but also have been
using Spark's HiveContext.
```
# This single hourly directory is only 5.3M
$ hdfs dfs -du -s -h
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
5.3 M 15.8 M
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
# This monthly directory is 1.8T. (There are subdirectories down to hourly
level here too.)
$ hdfs dfs -du -s -h
/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
1.8 T 5.3 T /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
```
If I create a Hive table on top of this data, and add the single hourly
partition, querying works via both Hive and Spark HiveContext
```sql
hive (otto)> CREATE EXTERNAL TABLE IF NOT EXISTS
`otto.webrequest_few_partitions_big_data`(
`hostname` string COMMENT 'Source node hostname',
`sequence` bigint COMMENT 'Per host sequence number',
`dt` string COMMENT 'Timestame at cache in ISO 8601',
`time_firstbyte` double COMMENT 'Time to first byte',
`ip` string COMMENT 'IP of packet at cache',
`cache_status` string COMMENT 'Cache status',
`http_status` string COMMENT 'HTTP status of response',
`response_size` bigint COMMENT 'Response size',
`http_method` string COMMENT 'HTTP method of request',
`uri_host` string COMMENT 'Host of request',
`uri_path` string COMMENT 'Path of request',
`uri_query` string COMMENT 'Query of request',
`content_type` string COMMENT 'Content-Type header of response',
`referer` string COMMENT 'Referer header of request',
`x_forwarded_for` string COMMENT 'X-Forwarded-For header of request',
`user_agent` string COMMENT 'User-Agent header of request',
`accept_language` string COMMENT 'Accept-Language header of request',
`x_analytics` string COMMENT 'X-Analytics header of response',
`range` string COMMENT 'Range header of response',
`is_pageview` boolean COMMENT 'Indicates if this record was marked as
a pageview during refinement',
`record_version` string COMMENT 'Keeps track of changes in the table
content definition -
https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest',
`client_ip` string COMMENT 'Client IP computed during refinement
using ip and x_forwarded_for',
`geocoded_data` map<string, string> COMMENT 'Geocoded map with
continent, country_code, country, city, subdivision, postal_code, latitude,
longitude, timezone keys and associated values.',
`x_cache` string COMMENT 'X-Cache header of response',
`user_agent_map` map<string, string> COMMENT 'User-agent map with
browser_name, browser_major, device, os_name, os_minor, os_major keys and
associated values',
`x_analytics_map` map<string, string> COMMENT 'X_analytics map view of
the x_analytics field',
`ts` timestamp COMMENT 'Unix timestamp in
milliseconds extracted from dt',
`access_method` string COMMENT 'Method used to accessing the site
(mobile app|mobile web|desktop)',
`agent_type` string COMMENT 'Categorise the agent making the
webrequest as either user or spider (automatas to be added).',
`is_zero` boolean COMMENT 'Indicates if the webrequest is
accessed through a zero provider',
`referer_class` string COMMENT 'Indicates if a referer is internal,
external or unknown.'
)
PARTITIONED BY (
`webrequest_source` string COMMENT 'Source cluster',
`year` int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day` int COMMENT 'Unpadded day of request',
`hour` int COMMENT 'Unpadded hour of request'
)
CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS
STORED AS PARQUET
LOCATION '/wmf/data/wmf/webrequest'
;
hive (otto)> alter table otto.webrequest_few_partitions_big_data add partition
(webrequest_source='misc', year=2015, month=5, day=20, hour=0) location
'/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0';
hive (otto)> SELECT uri_host, uri_path from
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and
year=2015 and month=5 and day=20 and hour=0 limit 4;
OK
uri_host uri_path
graphite.wikimedia.org /render
graphite.wikimedia.org /render
stats.wikimedia.org /EN/PlotTotalArticlesBS.png
etherpad.wikimedia.org /socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ
```
```scala
$ spark-shell
scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
scala> val query = "SELECT uri_host, uri_path from
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and
year=2015 and month=5 and day=20 and hour=0 limit 4"
scala> hc.sql(query).collect().foreach(println)
[graphite.wikimedia.org,/render]
[graphite.wikimedia.org,/render]
[stats.wikimedia.org,/EN/PlotTotalArticlesBS.png]
[etherpad.wikimedia.org,/socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ]
```
But, when I add more data, Spark either OOMs, or results in Premature EOF (if I
bump up memory). I've tried this with many hourly partitions, or just two
partitions one with lots of data. In either case the same amount of data was
present. For this example I'll just add one large partition to the table.
```sql
-- Of course day=0, hour=25 doesn't make sense, but I'm including all
-- of May in this one paritition, so I just chose dummy partition keys
hive (otto)> alter table otto.webrequest_few_partitions_big_data add partition
(webrequest_source='bits', year=2015, month=5, day=0, hour=25) location
'/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5';
hive (otto)> show partitions otto.webrequest_few_partitions_big_data;
OK
partition
webrequest_source=bits/year=2015/month=5/day=0/hour=25
webrequest_source=misc/year=2015/month=5/day=20/hour=0
hive (otto)> SELECT uri_host, uri_path from
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and
year=2015 and month=5 and day=20 and hour=0 limit 4;
OK
uri_host uri_path
graphite.wikimedia.org /render
graphite.wikimedia.org /render
stats.wikimedia.org /EN/PlotTotalArticlesBS.png
etherpad.wikimedia.org /socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ
```
Now, let's try the same query we did before in spark-shell.
```scala
$ spark-shell
Spark context available as sc.
15/05/22 19:04:25 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
hc: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@38fe4d15
scala> val query = "SELECT uri_host, uri_path from
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and
year=2015 and month=5 and day=20 and hour=0 limit 4"
query: String = SELECT uri_host, uri_path from
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and
year=2015 and month=5 and day=20 and hour=0 limit 4
scala> hc.sql(query).collect().foreach(println)
15/05/22 19:04:42 INFO metastore: Trying to connect to metastore with URI
thrift://analytics1027.eqiad.wmnet:9083
15/05/22 19:04:42 INFO metastore: Connected to metastore.
15/05/22 19:04:43 INFO SessionState: Created local directory:
/tmp/fcb5ac90-78c7-4368-adac-085d7935fd6b_resources
15/05/22 19:04:43 INFO SessionState: Created HDFS directory:
/tmp/hive/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b
15/05/22 19:04:43 INFO SessionState: Created local directory:
/tmp/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b
15/05/22 19:04:43 INFO SessionState: Created HDFS directory:
/tmp/hive/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b/_tmp_space.db
15/05/22 19:04:43 INFO SessionState: No Tez session required at this point.
hive.execution.engine=mr.
15/05/22 19:04:44 INFO ParseDriver: Parsing command: SELECT uri_host, uri_path
from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and
year=2015 and month=5 and day=20 and hour=0 limit 4
15/05/22 19:04:44 INFO ParseDriver: Parse Completed
### Many minutes pass... ###
Exception in thread "qtp454555988-129" Exception in thread "IPC Client
(1610773823) connection to analytics1001.eqiad.wmnet/10.64.36.118:8020 from
otto" Exception in thread "qtp454555988-52"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler
in thread "sparkDriver-scheduler-1"
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
15/05/22 18:49:15 INFO RetryInvocationHandler: Exception while invoking
getBlockLocations of class ClientNamenodeProtocolTranslatorPB over
analytics1001.eqiad.wmnet/10.64.36.118:8020. Trying to fail over immediately.
java.io.IOException: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211)
at
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
at
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1191)
at
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:299)
at
org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:265)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:257)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1490)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:411)
at
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)
at
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.popAndExecAll(ForkJoinPool.java:1243)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1344)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC
overhead limit exceeded
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:274)
at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
... 36 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
```
Note, I have also seen slightly different OOM errors on the CLI. Sometimes I
see:
```
15/05/22 19:09:22 ERROR ActorSystemImpl: exception on LARS’ timer thread
java.lang.OutOfMemoryError: GC overhead limit exceeded
15/05/22 19:09:29 INFO ActorSystemImpl: starting new LARS thread
15/05/22 19:09:29 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-scheduler-1] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "Driver Heartbeater" java.lang.OutOfMemoryError: GC
overhead limit exceeded
...
```
Ok, so let's increase the driver memory:
```scala
$ spark-shell --driver-memory 1500M
Spark context available as sc.
15/05/22 19:09:07 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
scala> val query = "SELECT uri_host, uri_path from
otto.webrequest_few_partitions_big_data where webrequest_source='misc' and
year=2015 and month=5 and day=20 and hour=0 limit 4"
scala> hc.sql(query).collect().foreach(println)
15/05/22 19:11:41 INFO metastore: Trying to connect to metastore with URI
thrift://analytics1027.eqiad.wmnet:9083
15/05/22 19:11:41 INFO metastore: Connected to metastore.
15/05/22 19:11:42 INFO SessionState: Created local directory:
/tmp/5f8996fc-ce1a-4954-8c6c-94c291aa8c70_resources
15/05/22 19:11:42 INFO SessionState: Created HDFS directory:
/tmp/hive/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70
15/05/22 19:11:42 INFO SessionState: Created local directory:
/tmp/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70
15/05/22 19:11:42 INFO SessionState: Created HDFS directory:
/tmp/hive/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70/_tmp_space.db
15/05/22 19:11:42 INFO SessionState: No Tez session required at this point.
hive.execution.engine=mr.
15/05/22 19:11:43 INFO ParseDriver: Parsing command: SELECT uri_host, uri_path
from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and
year=2015 and month=5 and day=20 and hour=0 limit 4
15/05/22 19:11:44 INFO ParseDriver: Parse Completed
### Almost 30 minutes pass... ###
java.io.EOFException: Premature EOF: no length prefix available
at
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2212)
at
org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:408)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:796)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:674)
at
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:621)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:847)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:700)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
at
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:423)
at
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)
at
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:172)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runSubtask(ForkJoinPool.java:1357)
at
scala.concurrent.forkjoin.ForkJoinPool.tryHelpStealer(ForkJoinPool.java:2253)
at
scala.concurrent.forkjoin.ForkJoinPool.awaitJoin(ForkJoinPool.java:2377)
at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
at
scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:187)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```
I've tested this both in local mode and in YARN client mode, and both have
similar behavoirs. What's worrysome is that the behavior is different after
adding more data to the table, even though I am querying the same very small
partition. The whole point of Hive partitions is to allow jobs to work with
only the data that is needed. I'm not sure what Spark HiveContext is doing
here, but it seems to couple the full size of a Hive table to the performance
of a query that only needs a very small amount of data.
I poked around the Spark source, and for a minute thought this might be
related: https://github.com/apache/spark/commit/42389b17, but that was included
in Spark 1.2.0, and this was working for us fine.
Is HiveContext somehow trying to scan the whole table in the driver? Has
anyone else had this problem?
Thanks!
-Andrew Otto