[ 
https://issues.apache.org/jira/browse/SPARK-15287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15282247#comment-15282247
 ] 

Davies Liu commented on SPARK-15287:
------------------------------------

By design, we can't compare two expression with different types (IntegerType 
and StringType), so both of them are converted to DoubleType. But HiveMetaStore 
does not support these complicated predicates on partition column.

So, I think we can't fix this easily. You should use string literals instead of 
integer literals, or use IntegerType for partition column.

> Spark SQL partition filter clause with different literal type will scan all 
> hive partitions
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15287
>                 URL: https://issues.apache.org/jira/browse/SPARK-15287
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 1.6.1
>            Reporter: Tao Li
>
> I have a hive table with a partition of string type. When the query with 
> filter of string literal, it will call getPartitionsByFilter. But when the 
> query with filter of integer literal, it will call getAllPartitions, which 
> will cause a heavy query in hive metastore when the hive table with too many 
> partitions.
> I find that, spark sql will cast the integer literal to double on logical 
> plan (I don't know why spark sql will do this cast), spark sql will dynamic 
> deceide to call getAllPartitions() or getPartitionsByFilter(). It seems that 
> the method convertFilters() can't cover the double case.
> {code:java}
> def convertFilters(table: Table, filters: Seq[Expression]): String = {
>     // hive varchar is treated as catalyst string, but hive varchar can't be 
> pushed down.
>     val varcharKeys = table.getPartitionKeys.asScala
>       .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) 
> ||
>         col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
>       .map(col => col.getName).toSet
>     filters.collect {
>       case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
>         s"${a.name} ${op.symbol} $v"
>       case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
>         s"$v ${op.symbol} ${a.name}"
>       case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
>           if !varcharKeys.contains(a.name) =>
>         s"""${a.name} ${op.symbol} "$v""""
>       case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
>           if !varcharKeys.contains(a.name) =>
>         s""""$v" ${op.symbol} ${a.name}"""
>     }.mkString(" and ")
>   }
>   override def getPartitionsByFilter(
>       hive: Hive,
>       table: Table,
>       predicates: Seq[Expression]): Seq[Partition] = {
>     // Hive getPartitionsByFilter() takes a string that represents partition
>     // predicates like "str_key=\"value\" and int_key=1 ..."
>     val filter = convertFilters(table, predicates)
>     val partitions =
>       if (filter.isEmpty) {
>         getAllPartitionsMethod.invoke(hive, 
> table).asInstanceOf[JSet[Partition]]
>       } else {
>         logDebug(s"Hive metastore filter is '$filter'.")
>         getPartitionsByFilterMethod.invoke(hive, table, 
> filter).asInstanceOf[JArrayList[Partition]]
>       }
>     partitions.asScala.toSeq
>   }
> {code}
> The query plan with the string literal filter, and logdate is the parition of 
> string type:
> {noformat}
> == Parsed Logical Plan ==
> Limit 21
> +- Limit 20
>    +- Project 
> [ip#1,field1#2,field2#3,field3#4,manualtime#5,timezone#6,pbstr#7,retcode#8,pagesize#9,refer#10,useragent#11,field4#12,responsetime#13,usid#14,style#15,unid#16,pl#17,usid_src#18,resinip#19,upstreamtime#20,uuid#21,qua#22,q_ext#23,line#24,logdate#0]
>       +- Filter (logdate#0 = 201605050000)
>          +- MetastoreRelation default, wap_nginx_5min, None
> == Analyzed Logical Plan ==
> ip: string, field1: string, field2: string, field3: string, manualtime: 
> string, timezone: string, pbstr: map<string,string>, retcode: string, 
> pagesize: string, refer: string, useragent: string, field4: string, 
> responsetime: string, usid: string, style: string, unid: string, pl: string, 
> usid_src: string, resinip: string, upstreamtime: string, uuid: string, qua: 
> string, q_ext: string, line: string, logdate: string
> Limit 21
> +- Limit 20
>    +- Project 
> [ip#1,field1#2,field2#3,field3#4,manualtime#5,timezone#6,pbstr#7,retcode#8,pagesize#9,refer#10,useragent#11,field4#12,responsetime#13,usid#14,style#15,unid#16,pl#17,usid_src#18,resinip#19,upstreamtime#20,uuid#21,qua#22,q_ext#23,line#24,logdate#0]
>       +- Filter (logdate#0 = 201605050000)
>          +- MetastoreRelation default, wap_nginx_5min, None
> == Optimized Logical Plan ==
> Limit 20
> +- Filter (logdate#0 = 201605050000)
>    +- MetastoreRelation default, wap_nginx_5min, None
> == Physical Plan ==
> Limit 20
> +- HiveTableScan 
> [ip#1,field1#2,field2#3,field3#4,manualtime#5,timezone#6,pbstr#7,retcode#8,pagesize#9,refer#10,useragent#11,field4#12,responsetime#13,usid#14,style#15,unid#16,pl#17,usid_src#18,resinip#19,upstreamtime#20,uuid#21,qua#22,q_ext#23,line#24,logdate#0],
>  MetastoreRelation default, wap_nginx_5min, None, [(logdate#0 = 201605050000)]
> {noformat}
> The query plan with the integer literal filter, which will do the double cast 
> and call the inefficient getAllPartitions():
> {noformat}
> == Parsed Logical Plan ==
> Limit 21
> +- Limit 20
>    +- Project 
> [ip#26,field1#27,field2#28,field3#29,manualtime#30,timezone#31,pbstr#32,retcode#33,pagesize#34,refer#35,useragent#36,field4#37,responsetime#38,usid#39,style#40,unid#41,pl#42,usid_src#43,resinip#44,upstreamtime#45,uuid#46,qua#47,q_ext#48,line#49,logdate#25]
>       +- Filter (cast(logdate#25 as double) = cast(201605050000 as double))
>          +- MetastoreRelation default, wap_nginx_5min, None
> == Analyzed Logical Plan ==
> ip: string, field1: string, field2: string, field3: string, manualtime: 
> string, timezone: string, pbstr: map<string,string>, retcode: string, 
> pagesize: string, refer: string, useragent: string, field4: string, 
> responsetime: string, usid: string, style: string, unid: string, pl: string, 
> usid_src: string, resinip: string, upstreamtime: string, uuid: string, qua: 
> string, q_ext: string, line: string, logdate: string
> Limit 21
> +- Limit 20
>    +- Project 
> [ip#26,field1#27,field2#28,field3#29,manualtime#30,timezone#31,pbstr#32,retcode#33,pagesize#34,refer#35,useragent#36,field4#37,responsetime#38,usid#39,style#40,unid#41,pl#42,usid_src#43,resinip#44,upstreamtime#45,uuid#46,qua#47,q_ext#48,line#49,logdate#25]
>       +- Filter (cast(logdate#25 as double) = cast(201605050000 as double))
>          +- MetastoreRelation default, wap_nginx_5min, None
> == Optimized Logical Plan ==
> Limit 20
> +- Filter (cast(logdate#25 as double) = 2.0160505E11)
>    +- MetastoreRelation default, wap_nginx_5min, None
> == Physical Plan ==
> Limit 20
> +- HiveTableScan 
> [ip#26,field1#27,field2#28,field3#29,manualtime#30,timezone#31,pbstr#32,retcode#33,pagesize#34,refer#35,useragent#36,field4#37,responsetime#38,usid#39,style#40,unid#41,pl#42,usid_src#43,resinip#44,upstreamtime#45,uuid#46,qua#47,q_ext#48,line#49,logdate#25],
>  MetastoreRelation default, wap_nginx_5min, None, [(cast(logdate#25 as 
> double) = 2.0160505E11)]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to