[ https://issues.apache.org/jira/browse/SPARK-15287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Davies Liu closed SPARK-15287. ------------------------------ Resolution: Won't Fix > Spark SQL partition filter clause with different literal type will scan all > hive partitions > ------------------------------------------------------------------------------------------- > > Key: SPARK-15287 > URL: https://issues.apache.org/jira/browse/SPARK-15287 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.0, 1.6.1 > Reporter: Tao Li > > I have a hive table with a partition of string type. When the query with > filter of string literal, it will call getPartitionsByFilter. But when the > query with filter of integer literal, it will call getAllPartitions, which > will cause a heavy query in hive metastore when the hive table with too many > partitions. > I find that, spark sql will cast the integer literal to double on logical > plan (I don't know why spark sql will do this cast), spark sql will dynamic > deceide to call getAllPartitions() or getPartitionsByFilter(). It seems that > the method convertFilters() can't cover the double case. > {code:java} > def convertFilters(table: Table, filters: Seq[Expression]): String = { > // hive varchar is treated as catalyst string, but hive varchar can't be > pushed down. > val varcharKeys = table.getPartitionKeys.asScala > .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) > || > col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME)) > .map(col => col.getName).toSet > filters.collect { > case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) => > s"${a.name} ${op.symbol} $v" > case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) => > s"$v ${op.symbol} ${a.name}" > case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType)) > if !varcharKeys.contains(a.name) => > s"""${a.name} ${op.symbol} "$v"""" > case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute) > if !varcharKeys.contains(a.name) => > s""""$v" ${op.symbol} ${a.name}""" > }.mkString(" and ") > } > override def getPartitionsByFilter( > hive: Hive, > table: Table, > predicates: Seq[Expression]): Seq[Partition] = { > // Hive getPartitionsByFilter() takes a string that represents partition > // predicates like "str_key=\"value\" and int_key=1 ..." > val filter = convertFilters(table, predicates) > val partitions = > if (filter.isEmpty) { > getAllPartitionsMethod.invoke(hive, > table).asInstanceOf[JSet[Partition]] > } else { > logDebug(s"Hive metastore filter is '$filter'.") > getPartitionsByFilterMethod.invoke(hive, table, > filter).asInstanceOf[JArrayList[Partition]] > } > partitions.asScala.toSeq > } > {code} > The query plan with the string literal filter, and logdate is the parition of > string type: > {noformat} > == Parsed Logical Plan == > Limit 21 > +- Limit 20 > +- Project > [ip#1,field1#2,field2#3,field3#4,manualtime#5,timezone#6,pbstr#7,retcode#8,pagesize#9,refer#10,useragent#11,field4#12,responsetime#13,usid#14,style#15,unid#16,pl#17,usid_src#18,resinip#19,upstreamtime#20,uuid#21,qua#22,q_ext#23,line#24,logdate#0] > +- Filter (logdate#0 = 201605050000) > +- MetastoreRelation default, wap_nginx_5min, None > == Analyzed Logical Plan == > ip: string, field1: string, field2: string, field3: string, manualtime: > string, timezone: string, pbstr: map<string,string>, retcode: string, > pagesize: string, refer: string, useragent: string, field4: string, > responsetime: string, usid: string, style: string, unid: string, pl: string, > usid_src: string, resinip: string, upstreamtime: string, uuid: string, qua: > string, q_ext: string, line: string, logdate: string > Limit 21 > +- Limit 20 > +- Project > [ip#1,field1#2,field2#3,field3#4,manualtime#5,timezone#6,pbstr#7,retcode#8,pagesize#9,refer#10,useragent#11,field4#12,responsetime#13,usid#14,style#15,unid#16,pl#17,usid_src#18,resinip#19,upstreamtime#20,uuid#21,qua#22,q_ext#23,line#24,logdate#0] > +- Filter (logdate#0 = 201605050000) > +- MetastoreRelation default, wap_nginx_5min, None > == Optimized Logical Plan == > Limit 20 > +- Filter (logdate#0 = 201605050000) > +- MetastoreRelation default, wap_nginx_5min, None > == Physical Plan == > Limit 20 > +- HiveTableScan > [ip#1,field1#2,field2#3,field3#4,manualtime#5,timezone#6,pbstr#7,retcode#8,pagesize#9,refer#10,useragent#11,field4#12,responsetime#13,usid#14,style#15,unid#16,pl#17,usid_src#18,resinip#19,upstreamtime#20,uuid#21,qua#22,q_ext#23,line#24,logdate#0], > MetastoreRelation default, wap_nginx_5min, None, [(logdate#0 = 201605050000)] > {noformat} > The query plan with the integer literal filter, which will do the double cast > and call the inefficient getAllPartitions(): > {noformat} > == Parsed Logical Plan == > Limit 21 > +- Limit 20 > +- Project > [ip#26,field1#27,field2#28,field3#29,manualtime#30,timezone#31,pbstr#32,retcode#33,pagesize#34,refer#35,useragent#36,field4#37,responsetime#38,usid#39,style#40,unid#41,pl#42,usid_src#43,resinip#44,upstreamtime#45,uuid#46,qua#47,q_ext#48,line#49,logdate#25] > +- Filter (cast(logdate#25 as double) = cast(201605050000 as double)) > +- MetastoreRelation default, wap_nginx_5min, None > == Analyzed Logical Plan == > ip: string, field1: string, field2: string, field3: string, manualtime: > string, timezone: string, pbstr: map<string,string>, retcode: string, > pagesize: string, refer: string, useragent: string, field4: string, > responsetime: string, usid: string, style: string, unid: string, pl: string, > usid_src: string, resinip: string, upstreamtime: string, uuid: string, qua: > string, q_ext: string, line: string, logdate: string > Limit 21 > +- Limit 20 > +- Project > [ip#26,field1#27,field2#28,field3#29,manualtime#30,timezone#31,pbstr#32,retcode#33,pagesize#34,refer#35,useragent#36,field4#37,responsetime#38,usid#39,style#40,unid#41,pl#42,usid_src#43,resinip#44,upstreamtime#45,uuid#46,qua#47,q_ext#48,line#49,logdate#25] > +- Filter (cast(logdate#25 as double) = cast(201605050000 as double)) > +- MetastoreRelation default, wap_nginx_5min, None > == Optimized Logical Plan == > Limit 20 > +- Filter (cast(logdate#25 as double) = 2.0160505E11) > +- MetastoreRelation default, wap_nginx_5min, None > == Physical Plan == > Limit 20 > +- HiveTableScan > [ip#26,field1#27,field2#28,field3#29,manualtime#30,timezone#31,pbstr#32,retcode#33,pagesize#34,refer#35,useragent#36,field4#37,responsetime#38,usid#39,style#40,unid#41,pl#42,usid_src#43,resinip#44,upstreamtime#45,uuid#46,qua#47,q_ext#48,line#49,logdate#25], > MetastoreRelation default, wap_nginx_5min, None, [(cast(logdate#25 as > double) = 2.0160505E11)] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org