Repository: spark Updated Branches: refs/heads/branch-2.1 f0d50fd54 -> 5a2ad4312
[SPARK-19893][SQL] should not run DataFrame set oprations with map type In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. new regression test Author: Wenchen Fan <wenc...@databricks.com> Closes #17236 from cloud-fan/map. (cherry picked from commit fb9beda54622e0c3190c6504fc468fa4e50eeb45) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a2ad431 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a2ad431 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a2ad431 Branch: refs/heads/branch-2.1 Commit: 5a2ad4312dd00a450eac49ce53d70d9541e9e4cb Parents: f0d50fd Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri Mar 10 16:14:22 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Mar 10 16:30:42 2017 -0800 ---------------------------------------------------------------------- .../sql/catalyst/analysis/CheckAnalysis.scala | 24 ++++++++++++++++---- .../org/apache/spark/sql/DataFrameSuite.scala | 16 +++++++++++++ .../columnar/InMemoryColumnarQuerySuite.scala | 14 +++++------- 3 files changed, 42 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5a2ad431/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 65a2a7b..f7109f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.plans.UsingJoin import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -46,6 +45,16 @@ trait CheckAnalysis extends PredicateHelper { }).length > 1 } + protected def hasMapType(dt: DataType): Boolean = { + dt.existsRecursively(_.isInstanceOf[MapType]) + } + + protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { + case _: Intersect | _: Except | _: Distinct => + plan.output.find(a => hasMapType(a.dataType)) + case _ => None + } + private def checkLimitClause(limitExpr: Expression): Unit = { limitExpr match { case e if !e.foldable => failAnalysis( @@ -123,8 +132,7 @@ trait CheckAnalysis extends PredicateHelper { if (conditions.isEmpty && query.output.size != 1) { failAnalysis( s"Scalar subquery must return only one column, but got ${query.output.size}") - } - else if (conditions.nonEmpty) { + } else if (conditions.nonEmpty) { // Collect the columns from the subquery for further checking. var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) @@ -202,7 +210,7 @@ trait CheckAnalysis extends PredicateHelper { s"filter expression '${f.condition.sql}' " + s"of type ${f.condition.dataType.simpleString} is not a boolean.") - case f @ Filter(condition, child) => + case Filter(condition, _) => splitConjunctivePredicates(condition).foreach { case _: PredicateSubquery | Not(_: PredicateSubquery) => case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => @@ -376,6 +384,14 @@ trait CheckAnalysis extends PredicateHelper { |Conflicting attributes: ${conflictingAttributes.mkString(",")} """.stripMargin) + // TODO: although map type is not orderable, technically map type should be able to be + // used in equality comparison, remove this type check once we support it. + case o if mapColumnInSetOperation(o).isDefined => + val mapCol = mapColumnInSetOperation(o).get + failAnalysis("Cannot have map type columns in DataFrame which calls " + + s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " + + "is " + mapCol.dataType.simpleString) + case s: SimpleCatalogRelation => failAnalysis( s""" http://git-wip-us.apache.org/repos/asf/spark/blob/5a2ad431/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 22dfc46..ec201f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1739,4 +1739,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as x").selectExpr("percentile(x, 0.5)") checkAnswer(df, Row(BigDecimal(0.0)) :: Nil) } + + test("SPARK-19893: cannot run set operations with map type") { + val df = spark.range(1).select(map(lit("key"), $"id").as("m")) + val e = intercept[AnalysisException](df.intersect(df)) + assert(e.message.contains( + "Cannot have map type columns in DataFrame which calls set operations")) + val e2 = intercept[AnalysisException](df.except(df)) + assert(e2.message.contains( + "Cannot have map type columns in DataFrame which calls set operations")) + withTempView("v") { + df.createOrReplaceTempView("v") + val e3 = intercept[AnalysisException](sql("SELECT DISTINCT m FROM v")) + assert(e3.message.contains( + "Cannot have map type columns in DataFrame which calls set operations")) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/5a2ad431/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index afeb478..8592a29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -234,8 +234,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { Seq(StringType, BinaryType, NullType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct) + DateType, TimestampType, ArrayType(IntegerType), struct) val fields = dataTypes.zipWithIndex.map { case (dataType, index) => StructField(s"col$index", dataType, true) } @@ -244,10 +243,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { // Create an RDD for the schema val rdd = - sparkContext.parallelize((1 to 10000), 10).map { i => + sparkContext.parallelize(1 to 10000, 10).map { i => Row( - s"str${i}: test cache.", - s"binary${i}: test cache.".getBytes(StandardCharsets.UTF_8), + s"str$i: test cache.", + s"binary$i: test cache.".getBytes(StandardCharsets.UTF_8), null, i % 2 == 0, i.toByte, @@ -255,13 +254,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { i, Long.MaxValue - i.toLong, (i + 0.25).toFloat, - (i + 0.75), + i + 0.75, BigDecimal(Long.MaxValue.toString + ".12345"), new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"), new Date(i), new Timestamp(i * 1000000L), - (i to i + 10).toSeq, - (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap, + i to i + 10, Row((i - 0.25).toFloat, Seq(true, false, null))) } spark.createDataFrame(rdd, schema).createOrReplaceTempView("InMemoryCache_different_data_types") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org