Repository: spark Updated Branches: refs/heads/master 42f83d7c4 -> c1217565e
[SPARK-22592][SQL] cleanup filter converting for hive ## What changes were proposed in this pull request? We have 2 different methods to convert filters for hive, regarding a config. This introduces duplicated and inconsistent code(e.g. one use helper objects for pattern match and one doesn't). ## How was this patch tested? existing tests Author: Wenchen Fan <wenc...@databricks.com> Closes #19801 from cloud-fan/cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1217565 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1217565 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1217565 Branch: refs/heads/master Commit: c1217565e20bd3297f3b1bc8f18f5dea933211c0 Parents: 42f83d7 Author: Wenchen Fan <wenc...@databricks.com> Authored: Thu Nov 23 15:33:26 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Thu Nov 23 15:33:26 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/hive/client/HiveShim.scala | 144 +++++++++---------- 1 file changed, 69 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c1217565/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index bd1b300..1eac70d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -585,53 +585,17 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { * Unsupported predicates are skipped. */ def convertFilters(table: Table, filters: Seq[Expression]): String = { - if (SQLConf.get.advancedPartitionPredicatePushdownEnabled) { - convertComplexFilters(table, filters) - } else { - convertBasicFilters(table, filters) - } - } - - - /** - * An extractor that matches all binary comparison operators except null-safe equality. - * - * Null-safe equality is not supported by Hive metastore partition predicate pushdown - */ - object SpecialBinaryComparison { - def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match { - case _: EqualNullSafe => None - case _ => Some((e.left, e.right)) + /** + * An extractor that matches all binary comparison operators except null-safe equality. + * + * Null-safe equality is not supported by Hive metastore partition predicate pushdown + */ + object SpecialBinaryComparison { + def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match { + case _: EqualNullSafe => None + case _ => Some((e.left, e.right)) + } } - } - - private def convertBasicFilters(table: Table, filters: Seq[Expression]): String = { - // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. - lazy val varcharKeys = table.getPartitionKeys.asScala - .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) || - col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME)) - .map(col => col.getName).toSet - - filters.collect { - case op @ SpecialBinaryComparison(a: Attribute, Literal(v, _: IntegralType)) => - s"${a.name} ${op.symbol} $v" - case op @ SpecialBinaryComparison(Literal(v, _: IntegralType), a: Attribute) => - s"$v ${op.symbol} ${a.name}" - case op @ SpecialBinaryComparison(a: Attribute, Literal(v, _: StringType)) - if !varcharKeys.contains(a.name) => - s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}""" - case op @ SpecialBinaryComparison(Literal(v, _: StringType), a: Attribute) - if !varcharKeys.contains(a.name) => - s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}""" - }.mkString(" and ") - } - - private def convertComplexFilters(table: Table, filters: Seq[Expression]): String = { - // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. - lazy val varcharKeys = table.getPartitionKeys.asScala - .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) || - col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME)) - .map(col => col.getName).toSet object ExtractableLiteral { def unapply(expr: Expression): Option[String] = expr match { @@ -643,9 +607,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { object ExtractableLiterals { def unapply(exprs: Seq[Expression]): Option[Seq[String]] = { - exprs.map(ExtractableLiteral.unapply).foldLeft(Option(Seq.empty[String])) { - case (Some(accum), Some(value)) => Some(accum :+ value) - case _ => None + val extractables = exprs.map(ExtractableLiteral.unapply) + if (extractables.nonEmpty && extractables.forall(_.isDefined)) { + Some(extractables.map(_.get)) + } else { + None } } } @@ -660,40 +626,68 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } def unapply(values: Set[Any]): Option[Seq[String]] = { - values.toSeq.foldLeft(Option(Seq.empty[String])) { - case (Some(accum), value) if valueToLiteralString.isDefinedAt(value) => - Some(accum :+ valueToLiteralString(value)) - case _ => None + val extractables = values.toSeq.map(valueToLiteralString.lift) + if (extractables.nonEmpty && extractables.forall(_.isDefined)) { + Some(extractables.map(_.get)) + } else { + None } } } - def convertInToOr(a: Attribute, values: Seq[String]): String = { - values.map(value => s"${a.name} = $value").mkString("(", " or ", ")") + object NonVarcharAttribute { + // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. + private val varcharKeys = table.getPartitionKeys.asScala + .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) || + col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME)) + .map(col => col.getName).toSet + + def unapply(attr: Attribute): Option[String] = { + if (varcharKeys.contains(attr.name)) { + None + } else { + Some(attr.name) + } + } + } + + def convertInToOr(name: String, values: Seq[String]): String = { + values.map(value => s"$name = $value").mkString("(", " or ", ")") } - lazy val convert: PartialFunction[Expression, String] = { - case In(a: Attribute, ExtractableLiterals(values)) - if !varcharKeys.contains(a.name) && values.nonEmpty => - convertInToOr(a, values) - case InSet(a: Attribute, ExtractableValues(values)) - if !varcharKeys.contains(a.name) && values.nonEmpty => - convertInToOr(a, values) - case op @ SpecialBinaryComparison(a: Attribute, ExtractableLiteral(value)) - if !varcharKeys.contains(a.name) => - s"${a.name} ${op.symbol} $value" - case op @ SpecialBinaryComparison(ExtractableLiteral(value), a: Attribute) - if !varcharKeys.contains(a.name) => - s"$value ${op.symbol} ${a.name}" - case And(expr1, expr2) - if convert.isDefinedAt(expr1) || convert.isDefinedAt(expr2) => - (convert.lift(expr1) ++ convert.lift(expr2)).mkString("(", " and ", ")") - case Or(expr1, expr2) - if convert.isDefinedAt(expr1) && convert.isDefinedAt(expr2) => - s"(${convert(expr1)} or ${convert(expr2)})" + val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled + + def convert(expr: Expression): Option[String] = expr match { + case In(NonVarcharAttribute(name), ExtractableLiterals(values)) if useAdvanced => + Some(convertInToOr(name, values)) + + case InSet(NonVarcharAttribute(name), ExtractableValues(values)) if useAdvanced => + Some(convertInToOr(name, values)) + + case op @ SpecialBinaryComparison(NonVarcharAttribute(name), ExtractableLiteral(value)) => + Some(s"$name ${op.symbol} $value") + + case op @ SpecialBinaryComparison(ExtractableLiteral(value), NonVarcharAttribute(name)) => + Some(s"$value ${op.symbol} $name") + + case And(expr1, expr2) if useAdvanced => + val converted = convert(expr1) ++ convert(expr2) + if (converted.isEmpty) { + None + } else { + Some(converted.mkString("(", " and ", ")")) + } + + case Or(expr1, expr2) if useAdvanced => + for { + left <- convert(expr1) + right <- convert(expr2) + } yield s"($left or $right)" + + case _ => None } - filters.map(convert.lift).collect { case Some(filterString) => filterString }.mkString(" and ") + filters.flatMap(convert).mkString(" and ") } private def quoteStringLiteral(str: String): String = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org