describe dummy; OK
sample string year string month string # Partition Information # col_name data_type comment year string month string val df = sqlContext.sql("select count(1) from rajub.dummy where year='*2017* '") df: org.apache.spark.sql.DataFrame = [_c0: bigint] *scala> df.explain* == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L]) +- TungstenExchange SinglePartition, None +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L]) +- Scan ParquetRelation: rajub.dummy[] InputPaths: maprfs:/user/rajub/dummy/sample/year=2016/month=10, maprfs:/user/rajub/dummy/sample/year=*2016*/month=11, maprfs:/user/rajub/dummy/sample/year=*2016*/month=9, maprfs:/user/rajub/dummy/sample/year=2017/month=10, maprfs:/user/rajub/dummy/sample/year=2017/month=11, maprfs:/user/rajub/dummy/sample/year=2017/month=9 On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <mich...@videoamp.com> wrote: > Can you paste the actual query plan here, please? > > On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org> wrote: > > > On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com> > wrote: > >> What is the physical query plan after you set >> spark.sql.hive.convertMetastoreParquet to true? >> > Physical plan continas all the partition locations > >> >> Michael >> >> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org> wrote: >> >> Thanks Michael for the respopnse. >> >> >> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com> >> wrote: >> >>> Hi Raju, >>> >>> I'm sorry this isn't working for you. I helped author this functionality >>> and will try my best to help. >>> >>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet >>> to false? >>> >> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did >> not work for me without setting *spark.sql.hive.convertMetastoreParquet* >> property. >> >> Can you link specifically to the jira issue or spark pr you referred to? >>> The first thing I would try is setting >>> spark.sql.hive.convertMetastoreParquet >>> to true. Setting that to false might also explain why you're getting >>> parquet decode errors. If you're writing your table data with Spark's >>> parquet file writer and reading with Hive's parquet file reader, there may >>> be an incompatibility accounting for the decode errors you're seeing. >>> >>> https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation >> is to avoid fetching all the partitions. We reverted >> spark.sql.hive.convertMetastoreParquet setting to true to decoding >> errors. After reverting this it is fetching all partiitons from the table. >> >> Can you reply with your table's Hive metastore schema, including >>> partition schema? >>> >> col1 string >> col2 string >> year int >> month int >> day int >> hour int >> >> # Partition Information >> >> # col_name data_type comment >> >> year int >> >> month int >> >> day int >> >> hour int >> >> venture string >> >>> >>> >> Where are the table's files located? >>> >> In hadoop. Under some user directory. >> >>> If you do a "show partitions <dbname>.<tablename>" in the spark-sql >>> shell, does it show the partitions you expect to see? If not, run "msck >>> repair table <dbname>.<tablename>". >>> >> Yes. It is listing the partitions >> >>> Cheers, >>> >>> Michael >>> >>> >>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <r...@apache.org> wrote: >>> >>> Had a high level look into the code. Seems getHiveQlPartitions method >>> from HiveMetastoreCatalog is getting called irrespective of >>> metastorePartitionPruning >>> conf value. >>> >>> It should not fetch all partitions if we set metastorePartitionPruning to >>> true (Default value for this is false) >>> >>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] >>> = { >>> val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { >>> table.getPartitions(predicates) >>> } else { >>> allPartitions >>> } >>> >>> ... >>> >>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = >>> client.getPartitionsByFilter(this, predicates) >>> >>> lazy val allPartitions = table.getAllPartitions >>> >>> But somehow getAllPartitions is getting called eventough after setting >>> metastorePartitionPruning to true. >>> >>> Am I missing something or looking at wrong place? >>> >>> >>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <r...@apache.org> >>> wrote: >>> >>>> Hello, >>>> >>>> Spark sql is generating query plan with all partitions information >>>> even though if we apply filters on partitions in the query. Due to >>>> this, sparkdriver/hive metastore is hitting with OOM as each table is >>>> with lots of partitions. >>>> >>>> We can confirm from hive audit logs that it tries to >>>> *fetch all partitions* from hive metastore. >>>> >>>> 2016-12-28 07:18:33,749 INFO [pool-4-thread-184]: HiveMetaStore.audit >>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub ip=/x.x.x.x >>>> cmd=get_partitions : db=xxxx tbl=xxxxx >>>> >>>> >>>> Configured the following parameters in the spark conf to fix the above >>>> issue(source: from spark-jira & github pullreq): >>>> >>>> *spark.sql.hive.convertMetastoreParquet false* >>>> * spark.sql.hive.metastorePartitionPruning true* >>>> >>>> >>>> * plan: rdf.explain* >>>> * == Physical Plan ==* >>>> HiveTableScan [rejection_reason#626], MetastoreRelation dbname, >>>> tablename, None, [(year#314 = 2016),(month#315 = 12),(day#316 = >>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)] >>>> >>>> * get_partitions_by_filter* method is called and fetching only >>>> required partitions. >>>> >>>> But we are seeing parquetDecode errors in our applications >>>> frequently after this. Looks like these decoding errors were because of >>>> changing serde fromspark-builtin to hive serde. >>>> >>>> I feel like,* fixing query plan generation in the spark-sql* is the >>>> right approach instead of forcing users to use hive serde. >>>> >>>> Is there any workaround/way to fix this issue? I would like to hear >>>> more thoughts on this :) >>>> >>>> >>>> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <r...@apache.org> >>>> wrote: >>>> >>>>> Had a high level look into the code. Seems getHiveQlPartitions method >>>>> from HiveMetastoreCatalog is getting called irrespective of >>>>> metastorePartitionPruning >>>>> conf value. >>>>> >>>>> It should not fetch all partitions if we set >>>>> metastorePartitionPruning to true (Default value for this is false) >>>>> >>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): >>>>> Seq[Partition] = { >>>>> val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { >>>>> table.getPartitions(predicates) >>>>> } else { >>>>> allPartitions >>>>> } >>>>> >>>>> ... >>>>> >>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = >>>>> client.getPartitionsByFilter(this, predicates) >>>>> >>>>> lazy val allPartitions = table.getAllPartitions >>>>> >>>>> But somehow getAllPartitions is getting called eventough after setting >>>>> metastorePartitionPruning to true. >>>>> >>>>> Am I missing something or looking at wrong place? >>>>> >>>>> >>>>> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <r...@apache.org> >>>>> wrote: >>>>> >>>>>> Waiting for suggestions/help on this... >>>>>> >>>>>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <r...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> Spark sql is generating query plan with all partitions >>>>>>> information even though if we apply filters on partitions in the query. >>>>>>> Due to this, spark driver/hive metastore is hitting with OOM as each >>>>>>> table >>>>>>> is with lots of partitions. >>>>>>> >>>>>>> We can confirm from hive audit logs that it tries to *fetch all >>>>>>> partitions* from hive metastore. >>>>>>> >>>>>>> 2016-12-28 07:18:33,749 INFO [pool-4-thread-184]: >>>>>>> HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - >>>>>>> ugi=rajub ip=/x.x.x.x cmd=get_partitions : db=xxxx tbl=xxxxx >>>>>>> >>>>>>> >>>>>>> Configured the following parameters in the spark conf to fix the >>>>>>> above issue(source: from spark-jira & github pullreq): >>>>>>> >>>>>>> *spark.sql.hive.convertMetastoreParquet false* >>>>>>> * spark.sql.hive.metastorePartitionPruning true* >>>>>>> >>>>>>> >>>>>>> * plan: rdf.explain* >>>>>>> * == Physical Plan ==* >>>>>>> HiveTableScan [rejection_reason#626], MetastoreRelation >>>>>>> dbname, tablename, None, [(year#314 = 2016),(month#315 = 12),(day#316 >>>>>>> = >>>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)] >>>>>>> >>>>>>> * get_partitions_by_filter* method is called and fetching only >>>>>>> required partitions. >>>>>>> >>>>>>> But we are seeing parquetDecode errors in our applications >>>>>>> frequently after this. Looks like these decoding errors were because of >>>>>>> changing serde from spark-builtin to hive serde. >>>>>>> >>>>>>> I feel like,* fixing query plan generation in the spark-sql* is the >>>>>>> right approach instead of forcing users to use hive serde. >>>>>>> >>>>>>> Is there any workaround/way to fix this issue? I would like to hear >>>>>>> more thoughts on this :) >>>>>>> >>>>>>> ------ >>>>>>> Thanks, >>>>>>> Raju Bairishetti, >>>>>>> www.lazada.com >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> ------ >>>>>> Thanks, >>>>>> Raju Bairishetti, >>>>>> www.lazada.com >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> ------ >>>>> Thanks, >>>>> Raju Bairishetti, >>>>> www.lazada.com >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> ------ >>>> Thanks, >>>> Raju Bairishetti, >>>> www.lazada.com >>>> >>> >>> >>> >>> -- >>> >>> ------ >>> Thanks, >>> Raju Bairishetti, >>> www.lazada.com >>> >>> >>> >> >> >> -- >> >> ------ >> Thanks, >> Raju Bairishetti, >> www.lazada.com >> >> >> > > > -- > > ------ > Thanks, > Raju Bairishetti, > www.lazada.com > > > -- ------ Thanks, Raju Bairishetti, www.lazada.com