[ https://issues.apache.org/jira/browse/SPARK-11246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yin Huai updated SPARK-11246: ----------------------------- Fix Version/s: 1.5.3 > [1.5] Table cache for Parquet broken in 1.5 > ------------------------------------------- > > Key: SPARK-11246 > URL: https://issues.apache.org/jira/browse/SPARK-11246 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.1 > Reporter: David Ross > Assignee: Xin Wu > Fix For: 1.5.3, 1.6.0 > > > Since upgrading to 1.5.1, using the {{CACHE TABLE}} works great for all > tables except for parquet tables, likely related to the parquet native reader. > Here are steps for parquet table: > {code} > create table test_parquet stored as parquet as select 1; > explain select * from test_parquet; > {code} > With output: > {code} > == Physical Plan == > Scan > ParquetRelation[hdfs://192.168.99.9/user/hive/warehouse/test_parquet][_c0#141] > {code} > And then caching: > {code} > cache table test_parquet; > explain select * from test_parquet; > {code} > With output: > {code} > == Physical Plan == > Scan > ParquetRelation[hdfs://192.168.99.9/user/hive/warehouse/test_parquet][_c0#174] > {code} > Note it isn't cached. I have included spark log output for the {{cache > table}} and {{explain}} statements below. > --- > Here's the same for non-parquet table: > {code} > cache table test_no_parquet; > explain select * from test_no_parquet; > {code} > With output: > {code} > == Physical Plan == > HiveTableScan [_c0#210], (MetastoreRelation default, test_no_parquet, None) > {code} > And then caching: > {code} > cache table test_no_parquet; > explain select * from test_no_parquet; > {code} > With output: > {code} > == Physical Plan == > InMemoryColumnarTableScan [_c0#229], (InMemoryRelation [_c0#229], true, > 10000, StorageLevel(true, true, false, true, 1), (HiveTableScan [_c0#211], > (MetastoreRelation default, test_no_parquet, None)), Some(test_no_parquet)) > {code} > Not that the table seems to be cached. > --- > Note that if the flag {{spark.sql.hive.convertMetastoreParquet}} is set to > {{false}}, parquet tables work the same as non-parquet tables with caching. > This is a reasonable workaround for us, but ideally, we would like to benefit > from the native reading. > --- > Spark logs for {{cache table}} for {{test_parquet}}: > {code} > 15/10/21 21:22:05 INFO thriftserver.SparkExecuteStatementOperation: Running > query 'cache table test_parquet' with 20ee2ab9-5242-4783-81cf-46115ed72610 > 15/10/21 21:22:05 INFO metastore.HiveMetaStore: 49: get_table : db=default > tbl=test_parquet > 15/10/21 21:22:05 INFO HiveMetaStore.audit: ugi=vagrant > ip=unknown-ip-addr cmd=get_table : db=default tbl=test_parquet > 15/10/21 21:22:05 INFO metastore.HiveMetaStore: 49: Opening raw store with > implemenation class:org.apache.hadoop.hive.metastore.ObjectStore > 15/10/21 21:22:05 INFO metastore.ObjectStore: ObjectStore, initialize called > 15/10/21 21:22:05 INFO DataNucleus.Query: Reading in results for query > "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is > closing > 15/10/21 21:22:05 INFO metastore.MetaStoreDirectSql: Using direct SQL, > underlying DB is MYSQL > 15/10/21 21:22:05 INFO metastore.ObjectStore: Initialized ObjectStore > 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(215680) called > with curMem=4196713, maxMem=139009720 > 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_59 stored as > values in memory (estimated size 210.6 KB, free 128.4 MB) > 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(20265) called > with curMem=4412393, maxMem=139009720 > 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_59_piece0 stored > as bytes in memory (estimated size 19.8 KB, free 128.3 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_59_piece0 in > memory on 192.168.99.9:50262 (size: 19.8 KB, free: 132.2 MB) > 15/10/21 21:22:05 INFO spark.SparkContext: Created broadcast 59 from run at > AccessController.java:-2 > 15/10/21 21:22:05 INFO metastore.HiveMetaStore: 49: get_table : db=default > tbl=test_parquet > 15/10/21 21:22:05 INFO HiveMetaStore.audit: ugi=vagrant > ip=unknown-ip-addr cmd=get_table : db=default tbl=test_parquet > 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(215680) called > with curMem=4432658, maxMem=139009720 > 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_60 stored as > values in memory (estimated size 210.6 KB, free 128.1 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Removed broadcast_58_piece0 > on 192.168.99.9:50262 in memory (size: 19.8 KB, free: 132.2 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Removed broadcast_57_piece0 > on 192.168.99.9:50262 in memory (size: 21.1 KB, free: 132.2 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Removed broadcast_57_piece0 > on slave2:46912 in memory (size: 21.1 KB, free: 534.5 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Removed broadcast_57_piece0 > on slave0:46599 in memory (size: 21.1 KB, free: 534.3 MB) > 15/10/21 21:22:05 INFO spark.ContextCleaner: Cleaned accumulator 86 > 15/10/21 21:22:05 INFO spark.ContextCleaner: Cleaned accumulator 84 > 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(20265) called > with curMem=4327620, maxMem=139009720 > 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_60_piece0 stored > as bytes in memory (estimated size 19.8 KB, free 128.4 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_60_piece0 in > memory on 192.168.99.9:50262 (size: 19.8 KB, free: 132.2 MB) > 15/10/21 21:22:05 INFO spark.SparkContext: Created broadcast 60 from run at > AccessController.java:-2 > 15/10/21 21:22:05 INFO spark.SparkContext: Starting job: run at > AccessController.java:-2 > 15/10/21 21:22:05 INFO parquet.ParquetRelation: Reading Parquet file(s) from > hdfs://192.168.99.9/user/hive/warehouse/test_parquet/part-r-00000-7cf64eb9-76ca-47c7-92aa-eb5ba879faae.gz.parquet > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Registering RDD 171 (run at > AccessController.java:-2) > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Got job 24 (run at > AccessController.java:-2) with 1 output partitions > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Final stage: ResultStage > 34(run at AccessController.java:-2) > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Parents of final stage: > List(ShuffleMapStage 33) > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Missing parents: > List(ShuffleMapStage 33) > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 33 > (MapPartitionsRDD[171] at run at AccessController.java:-2), which has no > missing parents > 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(9472) called with > curMem=4347885, maxMem=139009720 > 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_61 stored as > values in memory (estimated size 9.3 KB, free 128.4 MB) > 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(4838) called with > curMem=4357357, maxMem=139009720 > 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_61_piece0 stored > as bytes in memory (estimated size 4.7 KB, free 128.4 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_61_piece0 in > memory on 192.168.99.9:50262 (size: 4.7 KB, free: 132.2 MB) > 15/10/21 21:22:05 INFO spark.SparkContext: Created broadcast 61 from > broadcast at DAGScheduler.scala:861 > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Submitting 1 missing tasks > from ShuffleMapStage 33 (MapPartitionsRDD[171] at run at > AccessController.java:-2) > 15/10/21 21:22:05 INFO cluster.YarnScheduler: Adding task set 33.0 with 1 > tasks > 15/10/21 21:22:05 INFO scheduler.FairSchedulableBuilder: Added task set > TaskSet_33 tasks to pool default > 15/10/21 21:22:05 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 33.0 (TID 45, slave2, NODE_LOCAL, 2234 bytes) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_61_piece0 in > memory on slave2:46912 (size: 4.7 KB, free: 534.5 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_60_piece0 in > memory on slave2:46912 (size: 19.8 KB, free: 534.4 MB) > 15/10/21 21:22:05 INFO scheduler.TaskSetManager: Finished task 0.0 in stage > 33.0 (TID 45) in 105 ms on slave2 (1/1) > 15/10/21 21:22:05 INFO cluster.YarnScheduler: Removed TaskSet 33.0, whose > tasks have all completed, from pool default > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: ShuffleMapStage 33 (run at > AccessController.java:-2) finished in 0.105 s > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: looking for newly runnable > stages > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: running: Set() > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 34) > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: failed: Set() > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Missing parents for > ResultStage 34: List() > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: Finished stage: > org.apache.spark.scheduler.StageInfo@532f49c8 > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Submitting ResultStage 34 > (MapPartitionsRDD[174] at run at AccessController.java:-2), which is now > runnable > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: task runtime:(count: 1, > mean: 105.000000, stdev: 0.000000, max: 105.000000, min: 105.000000) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 105.0 ms > 105.0 ms 105.0 ms 105.0 ms 105.0 ms 105.0 ms > 105.0 ms 105.0 ms 105.0 ms > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: shuffle bytes > written:(count: 1, mean: 49.000000, stdev: 0.000000, max: 49.000000, min: > 49.000000) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 49.0 B 49.0 B > 49.0 B 49.0 B 49.0 B 49.0 B 49.0 B 49.0 B 49.0 B > 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(10440) called > with curMem=4362195, maxMem=139009720 > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: task result > size:(count: 1, mean: 2381.000000, stdev: 0.000000, max: 2381.000000, min: > 2381.000000) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 2.3 KB 2.3 KB > 2.3 KB 2.3 KB 2.3 KB 2.3 KB 2.3 KB 2.3 KB 2.3 KB > 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_62 stored as > values in memory (estimated size 10.2 KB, free 128.4 MB) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: executor (non-fetch) > time pct: (count: 1, mean: 68.571429, stdev: 0.000000, max: 68.571429, min: > 68.571429) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 69 % 69 % > 69 % 69 % 69 % 69 % 69 % 69 % 69 % > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: other time pct: (count: > 1, mean: 31.428571, stdev: 0.000000, max: 31.428571, min: 31.428571) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 31 % 31 % > 31 % 31 % 31 % 31 % 31 % 31 % 31 % > 15/10/21 21:22:05 INFO storage.MemoryStore: ensureFreeSpace(5358) called with > curMem=4372635, maxMem=139009720 > 15/10/21 21:22:05 INFO storage.MemoryStore: Block broadcast_62_piece0 stored > as bytes in memory (estimated size 5.2 KB, free 128.4 MB) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_62_piece0 in > memory on 192.168.99.9:50262 (size: 5.2 KB, free: 132.2 MB) > 15/10/21 21:22:05 INFO spark.SparkContext: Created broadcast 62 from > broadcast at DAGScheduler.scala:861 > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Submitting 1 missing tasks > from ResultStage 34 (MapPartitionsRDD[174] at run at AccessController.java:-2) > 15/10/21 21:22:05 INFO cluster.YarnScheduler: Adding task set 34.0 with 1 > tasks > 15/10/21 21:22:05 INFO scheduler.FairSchedulableBuilder: Added task set > TaskSet_34 tasks to pool default > 15/10/21 21:22:05 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 34.0 (TID 46, slave2, PROCESS_LOCAL, 1914 bytes) > 15/10/21 21:22:05 INFO storage.BlockManagerInfo: Added broadcast_62_piece0 in > memory on slave2:46912 (size: 5.2 KB, free: 534.4 MB) > 15/10/21 21:22:05 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send > map output locations for shuffle 9 to slave2:43867 > 15/10/21 21:22:05 INFO spark.MapOutputTrackerMaster: Size of output statuses > for shuffle 9 is 135 bytes > 15/10/21 21:22:05 INFO scheduler.TaskSetManager: Finished task 0.0 in stage > 34.0 (TID 46) in 48 ms on slave2 (1/1) > 15/10/21 21:22:05 INFO cluster.YarnScheduler: Removed TaskSet 34.0, whose > tasks have all completed, from pool default > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: ResultStage 34 (run at > AccessController.java:-2) finished in 0.047 s > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: Finished stage: > org.apache.spark.scheduler.StageInfo@37a20848 > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: task runtime:(count: 1, > mean: 48.000000, stdev: 0.000000, max: 48.000000, min: 48.000000) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 48.0 ms 48.0 ms > 48.0 ms 48.0 ms 48.0 ms 48.0 ms 48.0 ms 48.0 ms 48.0 ms > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: fetch wait time:(count: > 1, mean: 0.000000, stdev: 0.000000, max: 0.000000, min: 0.000000) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0.0 ms 0.0 ms > 0.0 ms 0.0 ms 0.0 ms 0.0 ms 0.0 ms 0.0 ms 0.0 ms > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: remote bytes > read:(count: 1, mean: 0.000000, stdev: 0.000000, max: 0.000000, min: 0.000000) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0.0 B 0.0 B > 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: task result > size:(count: 1, mean: 1737.000000, stdev: 0.000000, max: 1737.000000, min: > 1737.000000) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 1737.0 B > 1737.0 B 1737.0 B 1737.0 B 1737.0 B 1737.0 B > 1737.0 B 1737.0 B 1737.0 B > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: executor (non-fetch) > time pct: (count: 1, mean: 29.166667, stdev: 0.000000, max: 29.166667, min: > 29.166667) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 29 % 29 % > 29 % 29 % 29 % 29 % 29 % 29 % 29 % > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: fetch wait time pct: > (count: 1, mean: 0.000000, stdev: 0.000000, max: 0.000000, min: 0.000000) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0 % 0 % > 0 % 0 % 0 % 0 % 0 % 0 % 0 % > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: other time pct: (count: > 1, mean: 70.833333, stdev: 0.000000, max: 70.833333, min: 70.833333) > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 0% 5% > 10% 25% 50% 75% 90% 95% 100% > 15/10/21 21:22:05 INFO scheduler.StatsReportListener: 71 % 71 % > 71 % 71 % 71 % 71 % 71 % 71 % 71 % > 15/10/21 21:22:05 INFO scheduler.DAGScheduler: Job 24 finished: run at > AccessController.java:-2, took 0.175295 s > {code} > Spark logs for {{explain}} for {{test_parquet}}: > {code} > 15/10/21 21:23:19 INFO thriftserver.SparkExecuteStatementOperation: Running > query 'explain select * from test_parquet' with > bae9c0bf-57f9-4c80-b745-3f0202469f3f > 15/10/21 21:23:19 INFO parse.ParseDriver: Parsing command: explain select * > from test_parquet > 15/10/21 21:23:19 INFO parse.ParseDriver: Parse Completed > 15/10/21 21:23:19 INFO metastore.HiveMetaStore: 50: get_table : db=default > tbl=test_parquet > 15/10/21 21:23:19 INFO HiveMetaStore.audit: ugi=vagrant > ip=unknown-ip-addr cmd=get_table : db=default tbl=test_parquet > 15/10/21 21:23:19 INFO metastore.HiveMetaStore: 50: Opening raw store with > implemenation class:org.apache.hadoop.hive.metastore.ObjectStore > 15/10/21 21:23:19 INFO metastore.ObjectStore: ObjectStore, initialize called > 15/10/21 21:23:19 INFO DataNucleus.Query: Reading in results for query > "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is > closing > 15/10/21 21:23:19 INFO metastore.MetaStoreDirectSql: Using direct SQL, > underlying DB is MYSQL > 15/10/21 21:23:19 INFO metastore.ObjectStore: Initialized ObjectStore > 15/10/21 21:23:19 INFO storage.MemoryStore: ensureFreeSpace(215680) called > with curMem=4377993, maxMem=139009720 > 15/10/21 21:23:19 INFO storage.MemoryStore: Block broadcast_63 stored as > values in memory (estimated size 210.6 KB, free 128.2 MB) > 15/10/21 21:23:19 INFO storage.MemoryStore: ensureFreeSpace(20265) called > with curMem=4593673, maxMem=139009720 > 15/10/21 21:23:19 INFO storage.MemoryStore: Block broadcast_63_piece0 stored > as bytes in memory (estimated size 19.8 KB, free 128.2 MB) > 15/10/21 21:23:19 INFO storage.BlockManagerInfo: Added broadcast_63_piece0 in > memory on 192.168.99.9:50262 (size: 19.8 KB, free: 132.2 MB) > 15/10/21 21:23:19 INFO spark.SparkContext: Created broadcast 63 from run at > AccessController.java:-2 > 15/10/21 21:23:19 INFO thriftserver.SparkExecuteStatementOperation: Result > Schema: List(plan#262) > 15/10/21 21:23:19 INFO thriftserver.SparkExecuteStatementOperation: Result > Schema: List(plan#262) > 15/10/21 21:23:19 INFO thriftserver.SparkExecuteStatementOperation: Result > Schema: List(plan#262) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org