There is an open Jira on Spark not pushing predicates to metastore. I have a large dataset with many partitions but doing anything with it 8s very slow...But I am surprised Spark 1.2 worked for you: it has this problem...
<div>-------- Original message --------</div><div>From: Andrew Otto <ao...@wikimedia.org> </div><div>Date:05/22/2015 3:51 PM (GMT-05:00) </div><div>To: user@spark.apache.org </div><div>Cc: Joseph Allemandou <jalleman...@wikimedia.org>,Madhumitha Viswanathan <mviswanat...@wikimedia.org> </div><div>Subject: HiveContext fails when querying large external Parquet tables </div><div> </div>Hi all, (This email was easier to write in markdown, so I’ve created a gist with its contents here: https://gist.github.com/ottomata/f91ea76cece97444e269. I’ll paste the markdown content in the email body here too.) --- We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 1.1.0. Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0. Since upgrading, we can no longer query our large webrequest dataset using HiveContext. HiveContext + Parquet and other file types work fine with external tables (We have a similarly large JSON external table that works just fine with HiveContext.) Our webrequest dataset is stored in hourly partitioned Parquet files. We mainly interact with this dataset via a Hive external table, but also have been using Spark's HiveContext. ``` # This single hourly directory is only 5.3M $ hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0 5.3 M 15.8 M /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0 # This monthly directory is 1.8T. (There are subdirectories down to hourly level here too.) $ hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5 1.8 T 5.3 T /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5 ``` If I create a Hive table on top of this data, and add the single hourly partition, querying works via both Hive and Spark HiveContext ```sql hive (otto)> CREATE EXTERNAL TABLE IF NOT EXISTS `otto.webrequest_few_partitions_big_data`( `hostname` string COMMENT 'Source node hostname', `sequence` bigint COMMENT 'Per host sequence number', `dt` string COMMENT 'Timestame at cache in ISO 8601', `time_firstbyte` double COMMENT 'Time to first byte', `ip` string COMMENT 'IP of packet at cache', `cache_status` string COMMENT 'Cache status', `http_status` string COMMENT 'HTTP status of response', `response_size` bigint COMMENT 'Response size', `http_method` string COMMENT 'HTTP method of request', `uri_host` string COMMENT 'Host of request', `uri_path` string COMMENT 'Path of request', `uri_query` string COMMENT 'Query of request', `content_type` string COMMENT 'Content-Type header of response', `referer` string COMMENT 'Referer header of request', `x_forwarded_for` string COMMENT 'X-Forwarded-For header of request', `user_agent` string COMMENT 'User-Agent header of request', `accept_language` string COMMENT 'Accept-Language header of request', `x_analytics` string COMMENT 'X-Analytics header of response', `range` string COMMENT 'Range header of response', `is_pageview` boolean COMMENT 'Indicates if this record was marked as a pageview during refinement', `record_version` string COMMENT 'Keeps track of changes in the table content definition - https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest', `client_ip` string COMMENT 'Client IP computed during refinement using ip and x_forwarded_for', `geocoded_data` map<string, string> COMMENT 'Geocoded map with continent, country_code, country, city, subdivision, postal_code, latitude, longitude, timezone keys and associated values.', `x_cache` string COMMENT 'X-Cache header of response', `user_agent_map` map<string, string> COMMENT 'User-agent map with browser_name, browser_major, device, os_name, os_minor, os_major keys and associated values', `x_analytics_map` map<string, string> COMMENT 'X_analytics map view of the x_analytics field', `ts` timestamp COMMENT 'Unix timestamp in milliseconds extracted from dt', `access_method` string COMMENT 'Method used to accessing the site (mobile app|mobile web|desktop)', `agent_type` string COMMENT 'Categorise the agent making the webrequest as either user or spider (automatas to be added).', `is_zero` boolean COMMENT 'Indicates if the webrequest is accessed through a zero provider', `referer_class` string COMMENT 'Indicates if a referer is internal, external or unknown.' ) PARTITIONED BY ( `webrequest_source` string COMMENT 'Source cluster', `year` int COMMENT 'Unpadded year of request', `month` int COMMENT 'Unpadded month of request', `day` int COMMENT 'Unpadded day of request', `hour` int COMMENT 'Unpadded hour of request' ) CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS STORED AS PARQUET LOCATION '/wmf/data/wmf/webrequest' ; hive (otto)> alter table otto.webrequest_few_partitions_big_data add partition (webrequest_source='misc', year=2015, month=5, day=20, hour=0) location '/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0'; hive (otto)> SELECT uri_host, uri_path from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 limit 4; OK uri_host uri_path graphite.wikimedia.org /render graphite.wikimedia.org /render stats.wikimedia.org /EN/PlotTotalArticlesBS.png etherpad.wikimedia.org /socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ ``` ```scala $ spark-shell scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc) scala> val query = "SELECT uri_host, uri_path from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 limit 4" scala> hc.sql(query).collect().foreach(println) [graphite.wikimedia.org,/render] [graphite.wikimedia.org,/render] [stats.wikimedia.org,/EN/PlotTotalArticlesBS.png] [etherpad.wikimedia.org,/socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ] ``` But, when I add more data, Spark either OOMs, or results in Premature EOF (if I bump up memory). I've tried this with many hourly partitions, or just two partitions one with lots of data. In either case the same amount of data was present. For this example I'll just add one large partition to the table. ```sql -- Of course day=0, hour=25 doesn't make sense, but I'm including all -- of May in this one paritition, so I just chose dummy partition keys hive (otto)> alter table otto.webrequest_few_partitions_big_data add partition (webrequest_source='bits', year=2015, month=5, day=0, hour=25) location '/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5'; hive (otto)> show partitions otto.webrequest_few_partitions_big_data; OK partition webrequest_source=bits/year=2015/month=5/day=0/hour=25 webrequest_source=misc/year=2015/month=5/day=20/hour=0 hive (otto)> SELECT uri_host, uri_path from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 limit 4; OK uri_host uri_path graphite.wikimedia.org /render graphite.wikimedia.org /render stats.wikimedia.org /EN/PlotTotalArticlesBS.png etherpad.wikimedia.org /socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ ``` Now, let's try the same query we did before in spark-shell. ```scala $ spark-shell Spark context available as sc. 15/05/22 19:04:25 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc) hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@38fe4d15 scala> val query = "SELECT uri_host, uri_path from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 limit 4" query: String = SELECT uri_host, uri_path from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 limit 4 scala> hc.sql(query).collect().foreach(println) 15/05/22 19:04:42 INFO metastore: Trying to connect to metastore with URI thrift://analytics1027.eqiad.wmnet:9083 15/05/22 19:04:42 INFO metastore: Connected to metastore. 15/05/22 19:04:43 INFO SessionState: Created local directory: /tmp/fcb5ac90-78c7-4368-adac-085d7935fd6b_resources 15/05/22 19:04:43 INFO SessionState: Created HDFS directory: /tmp/hive/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b 15/05/22 19:04:43 INFO SessionState: Created local directory: /tmp/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b 15/05/22 19:04:43 INFO SessionState: Created HDFS directory: /tmp/hive/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b/_tmp_space.db 15/05/22 19:04:43 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/05/22 19:04:44 INFO ParseDriver: Parsing command: SELECT uri_host, uri_path from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 limit 4 15/05/22 19:04:44 INFO ParseDriver: Parse Completed ### Many minutes pass... ### Exception in thread "qtp454555988-129" Exception in thread "IPC Client (1610773823) connection to analytics1001.eqiad.wmnet/10.64.36.118:8020 from otto" Exception in thread "qtp454555988-52" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "sparkDriver-scheduler-1" java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded 15/05/22 18:49:15 INFO RetryInvocationHandler: Exception while invoking getBlockLocations of class ClientNamenodeProtocolTranslatorPB over analytics1001.eqiad.wmnet/10.64.36.118:8020. Trying to fail over immediately. java.io.IOException: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1191) at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:299) at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:265) at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:257) at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1490) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:411) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.popAndExecAll(ForkJoinPool.java:1243) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1344) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:274) at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254) ... 36 more Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded ``` Note, I have also seen slightly different OOM errors on the CLI. Sometimes I see: ``` 15/05/22 19:09:22 ERROR ActorSystemImpl: exception on LARS’ timer thread java.lang.OutOfMemoryError: GC overhead limit exceeded 15/05/22 19:09:29 INFO ActorSystemImpl: starting new LARS thread 15/05/22 19:09:29 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-scheduler-1] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded Exception in thread "Driver Heartbeater" java.lang.OutOfMemoryError: GC overhead limit exceeded ... ``` Ok, so let's increase the driver memory: ```scala $ spark-shell --driver-memory 1500M Spark context available as sc. 15/05/22 19:09:07 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc) scala> val query = "SELECT uri_host, uri_path from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 limit 4" scala> hc.sql(query).collect().foreach(println) 15/05/22 19:11:41 INFO metastore: Trying to connect to metastore with URI thrift://analytics1027.eqiad.wmnet:9083 15/05/22 19:11:41 INFO metastore: Connected to metastore. 15/05/22 19:11:42 INFO SessionState: Created local directory: /tmp/5f8996fc-ce1a-4954-8c6c-94c291aa8c70_resources 15/05/22 19:11:42 INFO SessionState: Created HDFS directory: /tmp/hive/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70 15/05/22 19:11:42 INFO SessionState: Created local directory: /tmp/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70 15/05/22 19:11:42 INFO SessionState: Created HDFS directory: /tmp/hive/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70/_tmp_space.db 15/05/22 19:11:42 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/05/22 19:11:43 INFO ParseDriver: Parsing command: SELECT uri_host, uri_path from otto.webrequest_few_partitions_big_data where webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 limit 4 15/05/22 19:11:44 INFO ParseDriver: Parse Completed ### Almost 30 minutes pass... ### java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2212) at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:408) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:796) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:674) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:621) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:847) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:700) at java.io.FilterInputStream.read(FilterInputStream.java:83) at parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:423) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:172) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runSubtask(ForkJoinPool.java:1357) at scala.concurrent.forkjoin.ForkJoinPool.tryHelpStealer(ForkJoinPool.java:2253) at scala.concurrent.forkjoin.ForkJoinPool.awaitJoin(ForkJoinPool.java:2377) at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341) at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673) at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:187) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` I've tested this both in local mode and in YARN client mode, and both have similar behavoirs. What's worrysome is that the behavior is different after adding more data to the table, even though I am querying the same very small partition. The whole point of Hive partitions is to allow jobs to work with only the data that is needed. I'm not sure what Spark HiveContext is doing here, but it seems to couple the full size of a Hive table to the performance of a query that only needs a very small amount of data. I poked around the Spark source, and for a minute thought this might be related: https://github.com/apache/spark/commit/42389b17, but that was included in Spark 1.2.0, and this was working for us fine. Is HiveContext somehow trying to scan the whole table in the driver? Has anyone else had this problem? Thanks! -Andrew Otto