IGNITE-9593: IgniteOptimization bugs with union, null fixes. - Fixes #4757.
Signed-off-by: Nikolay Izhikov <nizhi...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93010800 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93010800 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93010800 Branch: refs/heads/ignite-7251 Commit: 93010800a99d12f5f3c11f4369fb457791a74766 Parents: 5e0455e Author: Nikolay Izhikov <nizhi...@apache.org> Authored: Fri Sep 14 13:35:27 2018 +0300 Committer: Nikolay Izhikov <nizhi...@apache.org> Committed: Fri Sep 14 13:35:27 2018 +0300 ---------------------------------------------------------------------- .../ignite/spark/impl/IgniteSQLRelation.scala | 2 +- .../impl/optimization/SimpleExpressions.scala | 73 +++++++++++--------- .../accumulator/JoinSQLAccumulator.scala | 8 ++- .../accumulator/QueryAccumulator.scala | 14 +++- .../accumulator/SingleTableSQLAccumulator.scala | 8 ++- .../accumulator/UnionSQLAccumulator.scala | 25 +++++-- .../spark/sql/ignite/IgniteOptimization.scala | 10 ++- .../spark/IgniteDataFrameSchemaSpec.scala | 34 ++++++++- .../ignite/spark/IgniteOptimizationSpec.scala | 56 +++++++++++++++ 9 files changed, 180 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala index 485ddf6..1b4f277 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala @@ -119,7 +119,7 @@ object IgniteSQLRelation { StructType(columns.map { case (name, dataType) â StructField( - name = name, + name = table.getAliases.getOrDefault(name, name), dataType = IgniteRDD.dataType(dataType, name), nullable = !isKeyColumn(table, name), metadata = Metadata.empty) http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala index 10d021a..4e54ffc 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala @@ -48,41 +48,43 @@ private[optimization] object SimpleExpressions extends SupportedExpressions { /** @inheritdoc */ override def toString(expr: Expression, childToString: Expression â String, useQualifier: Boolean, useAlias: Boolean): Option[String] = expr match { - case l: Literal â l.dataType match { - case StringType â - Some("'" + l.value.toString + "'") - - case TimestampType â - l.value match { - //Internal representation of TimestampType is Long. - //So we converting from internal spark representation to CAST call. - case date: Long â - Some(s"CAST('${timestampFormat.get.format(DateTimeUtils.toJavaTimestamp(date))}' AS TIMESTAMP)") + case l: Literal â + if (l.value == null) + Some("null") + else { + l.dataType match { + case StringType â + Some("'" + l.value.toString + "'") + + case TimestampType â + l.value match { + //Internal representation of TimestampType is Long. + //So we converting from internal spark representation to CAST call. + case date: Long â + Some(s"CAST('${timestampFormat.get.format(DateTimeUtils.toJavaTimestamp(date))}' " + + s"AS TIMESTAMP)") + + case _ â + Some(l.value.toString) + } + + case DateType â + l.value match { + //Internal representation of DateType is Int. + //So we converting from internal spark representation to CAST call. + case days: Integer â + val date = new java.util.Date(DateTimeUtils.daysToMillis(days)) + + Some(s"CAST('${dateFormat.get.format(date)}' AS DATE)") + + case _ â + Some(l.value.toString) + } case _ â Some(l.value.toString) } - - case DateType â - l.value match { - //Internal representation of DateType is Int. - //So we converting from internal spark representation to CAST call. - case days: Integer â - val date = new java.util.Date(DateTimeUtils.daysToMillis(days)) - - Some(s"CAST('${dateFormat.get.format(date)}' AS DATE)") - - case _ â - Some(l.value.toString) - } - - case _ â - if (l.value == null) - Some("null") - else - Some(l.value.toString) - } - + } case ar: AttributeReference â val name = if (useQualifier) @@ -90,9 +92,11 @@ private[optimization] object SimpleExpressions extends SupportedExpressions { else ar.name - if (ar.metadata.contains(ALIAS) && !isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) && useAlias) + if (ar.metadata.contains(ALIAS) && + !isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) && + useAlias) { Some(aliasToString(name, ar.metadata.getString(ALIAS))) - else + } else Some(name) case Alias(child, name) â @@ -142,7 +146,8 @@ private[optimization] object SimpleExpressions extends SupportedExpressions { Set[DataType](BooleanType, StringType)(to) case ByteType â - Set(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _), StringType)(to) + Set(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _), + StringType)(to) case ShortType â Set(ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to) http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala index 7ae5e70..baf5a8b 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala @@ -44,7 +44,7 @@ private[apache] case class JoinSQLAccumulator( orderBy: Option[Seq[SortOrder]] = None ) extends BinaryNode with SelectAccumulator { /** @inheritdoc */ - override def compileQuery(prettyPrint: Boolean = false): String = { + override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = { val delim = if (prettyPrint) "\n" else " " val tab = if (prettyPrint) " " else "" @@ -68,9 +68,13 @@ private[apache] case class JoinSQLAccumulator( sql += s"${delim}ORDER BY " + s"${fixQualifier(orderBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}" - if (limit.isDefined) + if (limit.isDefined) { sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), useQualifier = true)}" + if (nestedQuery) + sql = s"SELECT * FROM ($sql)" + } + sql } http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala index 133d355..9570a66 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala @@ -18,7 +18,7 @@ package org.apache.ignite.spark.impl.optimization.accumulator import org.apache.ignite.spark.impl.optimization.IgniteQueryContext -import org.apache.spark.sql.catalyst.expressions.{NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** @@ -53,10 +53,20 @@ private[apache] trait QueryAccumulator extends LogicalPlan { def withOrderBy(orderBy: Seq[SortOrder]): QueryAccumulator /** + * @return Copy of this accumulator with `limit` expression. + */ + def withLimit(limit: Expression): QueryAccumulator + + /** + * @return Copy of this accumulator with `localLimit` expression. + */ + def withLocalLimit(localLimit: Expression): QueryAccumulator + + /** * @param prettyPrint If true human readable query will be generated. * @return SQL query. */ - def compileQuery(prettyPrint: Boolean = false): String + def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String /** * @return Qualifier that should be use to select data from this accumulator. http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala index 47035b9..735740f 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala @@ -42,7 +42,7 @@ private[apache] case class SingleTableSQLAccumulator( orderBy: Option[Seq[SortOrder]] = None ) extends SelectAccumulator { /** @inheritdoc */ - override def compileQuery(prettyPrint: Boolean = false): String = { + override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = { val delim = if (prettyPrint) "\n" else " " val tab = if (prettyPrint) " " else "" @@ -61,9 +61,13 @@ private[apache] case class SingleTableSQLAccumulator( if (orderBy.exists(_.nonEmpty)) sql += s"${delim}ORDER BY ${orderBy.get.map(exprToString(_)).mkString(s",$delim$tab")}" - if (limit.isDefined) + if (limit.isDefined) { sql += s" LIMIT ${limit.map(exprToString(_)).get}" + if (nestedQuery) + sql = s"SELECT * FROM ($sql)" + } + sql } http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala index 723e17a..5f870e3 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala @@ -18,7 +18,7 @@ package org.apache.ignite.spark.impl.optimization.accumulator import org.apache.ignite.spark.impl.optimization.{IgniteQueryContext, exprToString, toAttributeReference} -import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder} /** * Accumulator to store info about UNION query. @@ -27,21 +27,32 @@ private[apache] case class UnionSQLAccumulator( igniteQueryContext: IgniteQueryContext, children: Seq[QueryAccumulator], outputExpressions: Seq[NamedExpression], + limit: Option[Expression] = None, + localLimit: Option[Expression] = None, orderBy: Option[Seq[SortOrder]] = None ) extends QueryAccumulator { /** @inheritdoc */ - override def compileQuery(prettyPrint: Boolean = false): String = { + override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = { val delim = if (prettyPrint) "\n" else " " val tab = if (prettyPrint) " " else "" - val query = children.map(_.compileQuery(prettyPrint)).mkString(s"${delim}UNION$delim") + var query = children.map(_.compileQuery(prettyPrint, nestedQuery = true)).mkString(s"${delim}UNION$delim") - orderBy match { + query = orderBy match { case Some(sortOrders) â query + s"${delim}ORDER BY ${sortOrders.map(exprToString(_)).mkString(s",$delim$tab")}" case None â query } + + if (limit.isDefined) { + query += s" LIMIT ${exprToString(limit.get)}" + + if (nestedQuery) + query = s"SELECT * FROM ($query)" + } + + query } /** @inheritdoc */ @@ -60,4 +71,10 @@ private[apache] case class UnionSQLAccumulator( /** @inheritdoc */ override lazy val qualifier: String = igniteQueryContext.uniqueTableAlias + + /** @inheritdoc */ + override def withLimit(limit: Expression): QueryAccumulator = copy(limit = Some(limit)) + + /** @inheritdoc */ + override def withLocalLimit(localLimit: Expression): QueryAccumulator = copy(localLimit = Some(localLimit)) } http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala index 4a0f791..2d97792 100644 --- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala +++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala @@ -126,7 +126,7 @@ object IgniteOptimization extends Rule[LogicalPlan] with Logging { if (acc.groupBy.isDefined) { val tableAlias = acc.igniteQueryContext.uniqueTableAlias - accumulator.SingleTableSQLAccumulator( + SingleTableSQLAccumulator( igniteQueryContext = acc.igniteQueryContext, table = None, tableExpression = Some((acc, tableAlias)), @@ -141,7 +141,7 @@ object IgniteOptimization extends Rule[LogicalPlan] with Logging { case acc: QueryAccumulator â val tableAlias = acc.igniteQueryContext.uniqueTableAlias - accumulator.SingleTableSQLAccumulator( + SingleTableSQLAccumulator( igniteQueryContext = acc.igniteQueryContext, table = None, tableExpression = Some((acc, tableAlias)), @@ -156,6 +156,9 @@ object IgniteOptimization extends Rule[LogicalPlan] with Logging { case acc: SelectAccumulator â acc.withLocalLimit(limit.limitExpr) + case acc: QueryAccumulator â + acc.withLocalLimit(limit.limitExpr) + case _ â throw new IgniteException("stepSkipped == true but child is not SelectAccumulator") } @@ -165,6 +168,9 @@ object IgniteOptimization extends Rule[LogicalPlan] with Logging { case acc: SelectAccumulator â acc.withLimit(limit.limitExpr) + case acc: QueryAccumulator â + acc.withLimit(limit.limitExpr) + case _ â throw new IgniteException("stepSkipped == true but child is not SelectAccumulator") } http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala index c5df901..b071008 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala @@ -17,6 +17,8 @@ package org.apache.ignite.spark +import org.apache.ignite.cache.query.annotations.QuerySqlField +import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.spark.AbstractDataFrameSpec._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types._ @@ -24,6 +26,8 @@ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.apache.ignite.spark.IgniteDataFrameSettings._ +import scala.annotation.meta.field + /** * Tests to check loading schema for Ignite data sources. */ @@ -33,6 +37,8 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec { var employeeDataFrame: DataFrame = _ + var personWithAliasesDataFrame: DataFrame = _ + describe("Loading DataFrame schema for Ignite tables") { it("should successfully load DataFrame schema for a Ignite SQL Table") { personDataFrame.schema.fields.map(f â (f.name, f.dataType, f.nullable)) should equal ( @@ -52,9 +58,17 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec { it("should successfully load DataFrame data for a Ignite table configured throw java annotation") { employeeDataFrame.schema.fields.map(f â (f.name, f.dataType, f.nullable)) should equal ( Array( - ("id", LongType, true), - ("name", StringType, true), - ("salary", FloatType, true)) + ("ID", LongType, true), + ("NAME", StringType, true), + ("SALARY", FloatType, true)) + ) + } + + it("should use QueryEntity column aliases") { + personWithAliasesDataFrame.schema.fields.map(f â (f.name, f.dataType, f.nullable)) should equal ( + Array( + ("ID", LongType, true), + ("PERSON_NAME", StringType, true)) ) } } @@ -62,6 +76,16 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec { override protected def beforeAll(): Unit = { super.beforeAll() + client.getOrCreateCache(new CacheConfiguration[Long, JPersonWithAlias]() + .setName("P3") + .setIndexedTypes(classOf[Long], classOf[JPersonWithAlias])) + + personWithAliasesDataFrame = spark.read + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, classOf[JPersonWithAlias].getSimpleName) + .load() + createPersonTable(client, DEFAULT_CACHE) createEmployeeCache(client, EMPLOYEE_CACHE_NAME) @@ -82,4 +106,8 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec { employeeDataFrame.createOrReplaceTempView("employee") } + + case class JPersonWithAlias( + @(QuerySqlField @field) id: Long, + @(QuerySqlField @field)(name = "person_name", index = true) name: String) } http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala index ff367af..c2b5973 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala @@ -17,12 +17,20 @@ package org.apache.ignite.spark +import org.apache.ignite.cache.query.annotations.QuerySqlField +import org.apache.ignite.configuration.CacheConfiguration import org.apache.spark.sql.ignite.IgniteSparkSession import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.apache.ignite.internal.IgnitionEx import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} +import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_TABLE} +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.types.DataTypes.StringType +import org.apache.spark.sql.{Dataset, Row} + +import scala.annotation.meta.field /** */ @@ -232,6 +240,25 @@ class IgniteOptimizationSpec extends AbstractDataFrameSpec { checkQueryData(df, data) } + + it("Should optimize union") { + val union = readTable("JPerson").union(readTable("JPerson2")) + + val data = ( + (1, "JPerson-1"), + (2, "JPerson-2")) + + checkQueryData(union, data) + } + + it("Should optimize null column") { + val p = readTable("JPerson").withColumn("nullColumn", lit(null).cast(StringType)) + + val data = Tuple1( + (1, "JPerson-1", null)) + + checkQueryData(p, data) + } } describe("Not Optimized Queries") { @@ -278,6 +305,13 @@ class IgniteOptimizationSpec extends AbstractDataFrameSpec { } } + def readTable(tblName: String): Dataset[Row] = + igniteSession.read + .format(FORMAT_IGNITE) + .option(OPTION_TABLE, tblName) + .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .load + override protected def beforeAll(): Unit = { super.beforeAll() @@ -285,6 +319,20 @@ class IgniteOptimizationSpec extends AbstractDataFrameSpec { createCityTable(client, DEFAULT_CACHE) + val p = client.getOrCreateCache(new CacheConfiguration[Long, JPerson]() + .setName("P") + .setSqlSchema("SQL_PUBLIC") + .setIndexedTypes(classOf[Long], classOf[JPerson])) + + p.put(1L, new JPerson(1L, "JPerson-1")) + + val p2 = client.getOrCreateCache(new CacheConfiguration[Long, JPerson2]() + .setName("P2") + .setSqlSchema("SQL_PUBLIC") + .setIndexedTypes(classOf[Long], classOf[JPerson2])) + + p2.put(1L, new JPerson2(2L, "JPerson-2")) + val configProvider = enclose(null) (x â () â { val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() @@ -302,4 +350,12 @@ class IgniteOptimizationSpec extends AbstractDataFrameSpec { igniteSession.udf.register("test_reverse", (str: String) â str.reverse) } + + case class JPerson( + @(QuerySqlField @field) id: Long, + @(QuerySqlField @field)(index = true) name: String) + + case class JPerson2( + @(QuerySqlField @field) id: Long, + @(QuerySqlField @field)(index = true) name: String) }