IGNITE-9593: IgniteOptimization bugs with union, null fixes. - Fixes #4757.

Signed-off-by: Nikolay Izhikov <nizhi...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93010800
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93010800
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93010800

Branch: refs/heads/ignite-7251
Commit: 93010800a99d12f5f3c11f4369fb457791a74766
Parents: 5e0455e
Author: Nikolay Izhikov <nizhi...@apache.org>
Authored: Fri Sep 14 13:35:27 2018 +0300
Committer: Nikolay Izhikov <nizhi...@apache.org>
Committed: Fri Sep 14 13:35:27 2018 +0300

----------------------------------------------------------------------
 .../ignite/spark/impl/IgniteSQLRelation.scala   |  2 +-
 .../impl/optimization/SimpleExpressions.scala   | 73 +++++++++++---------
 .../accumulator/JoinSQLAccumulator.scala        |  8 ++-
 .../accumulator/QueryAccumulator.scala          | 14 +++-
 .../accumulator/SingleTableSQLAccumulator.scala |  8 ++-
 .../accumulator/UnionSQLAccumulator.scala       | 25 +++++--
 .../spark/sql/ignite/IgniteOptimization.scala   | 10 ++-
 .../spark/IgniteDataFrameSchemaSpec.scala       | 34 ++++++++-
 .../ignite/spark/IgniteOptimizationSpec.scala   | 56 +++++++++++++++
 9 files changed, 180 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
index 485ddf6..1b4f277 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -119,7 +119,7 @@ object IgniteSQLRelation {
 
         StructType(columns.map { case (name, dataType) ⇒
             StructField(
-                name = name,
+                name = table.getAliases.getOrDefault(name, name),
                 dataType = IgniteRDD.dataType(dataType, name),
                 nullable = !isKeyColumn(table, name),
                 metadata = Metadata.empty)

http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
index 10d021a..4e54ffc 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
@@ -48,41 +48,43 @@ private[optimization] object SimpleExpressions extends 
SupportedExpressions {
     /** @inheritdoc */
     override def toString(expr: Expression, childToString: Expression ⇒ 
String, useQualifier: Boolean,
         useAlias: Boolean): Option[String] = expr match {
-        case l: Literal ⇒ l.dataType match {
-            case StringType ⇒
-                Some("'" + l.value.toString + "'")
-
-            case TimestampType ⇒
-                l.value match {
-                    //Internal representation of TimestampType is Long.
-                    //So we converting from internal spark representation to 
CAST call.
-                    case date: Long ⇒
-                        
Some(s"CAST('${timestampFormat.get.format(DateTimeUtils.toJavaTimestamp(date))}'
 AS TIMESTAMP)")
+        case l: Literal ⇒
+            if (l.value == null)
+                Some("null")
+            else {
+                l.dataType match {
+                    case StringType ⇒
+                        Some("'" + l.value.toString + "'")
+
+                    case TimestampType ⇒
+                        l.value match {
+                            //Internal representation of TimestampType is Long.
+                            //So we converting from internal spark 
representation to CAST call.
+                            case date: Long ⇒
+                                
Some(s"CAST('${timestampFormat.get.format(DateTimeUtils.toJavaTimestamp(date))}'
 " +
+                                    s"AS TIMESTAMP)")
+
+                            case _ ⇒
+                                Some(l.value.toString)
+                        }
+
+                    case DateType ⇒
+                        l.value match {
+                            //Internal representation of DateType is Int.
+                            //So we converting from internal spark 
representation to CAST call.
+                            case days: Integer ⇒
+                                val date = new 
java.util.Date(DateTimeUtils.daysToMillis(days))
+
+                                Some(s"CAST('${dateFormat.get.format(date)}' 
AS DATE)")
+
+                            case _ ⇒
+                                Some(l.value.toString)
+                        }
 
                     case _ ⇒
                         Some(l.value.toString)
                 }
-
-            case DateType ⇒
-                l.value match {
-                    //Internal representation of DateType is Int.
-                    //So we converting from internal spark representation to 
CAST call.
-                    case days: Integer ⇒
-                        val date = new 
java.util.Date(DateTimeUtils.daysToMillis(days))
-
-                        Some(s"CAST('${dateFormat.get.format(date)}' AS DATE)")
-
-                    case _ ⇒
-                        Some(l.value.toString)
-                }
-
-            case _ ⇒
-                if (l.value == null)
-                    Some("null")
-                else
-                    Some(l.value.toString)
-        }
-
+            }
         case ar: AttributeReference ⇒
             val name =
                 if (useQualifier)
@@ -90,9 +92,11 @@ private[optimization] object SimpleExpressions extends 
SupportedExpressions {
                 else
                     ar.name
 
-            if (ar.metadata.contains(ALIAS) && 
!isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) && useAlias)
+            if (ar.metadata.contains(ALIAS) &&
+                !isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) 
&&
+                useAlias) {
                 Some(aliasToString(name, ar.metadata.getString(ALIAS)))
-            else
+            } else
                 Some(name)
 
         case Alias(child, name) ⇒
@@ -142,7 +146,8 @@ private[optimization] object SimpleExpressions extends 
SupportedExpressions {
             Set[DataType](BooleanType, StringType)(to)
 
         case ByteType ⇒
-            Set(ByteType, ShortType, IntegerType, LongType, FloatType, 
DoubleType, StringType, DecimalType(_, _), StringType)(to)
+            Set(ByteType, ShortType, IntegerType, LongType, FloatType, 
DoubleType, StringType, DecimalType(_, _),
+                StringType)(to)
 
         case ShortType ⇒
             Set(ShortType, IntegerType, LongType, FloatType, DoubleType, 
StringType, DecimalType(_, _))(to)

http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
index 7ae5e70..baf5a8b 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
@@ -44,7 +44,7 @@ private[apache] case class JoinSQLAccumulator(
     orderBy: Option[Seq[SortOrder]] = None
 ) extends BinaryNode with SelectAccumulator {
     /** @inheritdoc */
-    override def compileQuery(prettyPrint: Boolean = false): String = {
+    override def compileQuery(prettyPrint: Boolean = false, nestedQuery: 
Boolean = false): String = {
         val delim = if (prettyPrint) "\n" else " "
         val tab = if (prettyPrint) "  " else ""
 
@@ -68,9 +68,13 @@ private[apache] case class JoinSQLAccumulator(
             sql += s"${delim}ORDER BY " +
                 s"${fixQualifier(orderBy.get).map(exprToString(_, useQualifier 
= true)).mkString(s",$delim$tab")}"
 
-        if (limit.isDefined)
+        if (limit.isDefined) {
             sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), 
useQualifier = true)}"
 
+            if (nestedQuery)
+                sql = s"SELECT * FROM ($sql)"
+        }
+
         sql
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
index 133d355..9570a66 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
@@ -18,7 +18,7 @@
 package org.apache.ignite.spark.impl.optimization.accumulator
 
 import org.apache.ignite.spark.impl.optimization.IgniteQueryContext
-import org.apache.spark.sql.catalyst.expressions.{NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, 
SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 /**
@@ -53,10 +53,20 @@ private[apache] trait QueryAccumulator extends LogicalPlan {
     def withOrderBy(orderBy: Seq[SortOrder]): QueryAccumulator
 
     /**
+      * @return Copy of this accumulator with `limit` expression.
+      */
+    def withLimit(limit: Expression): QueryAccumulator
+
+    /**
+      * @return Copy of this accumulator with `localLimit` expression.
+      */
+    def withLocalLimit(localLimit: Expression): QueryAccumulator
+
+    /**
       * @param prettyPrint If true human readable query will be generated.
       * @return SQL query.
       */
-    def compileQuery(prettyPrint: Boolean = false): String
+    def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = 
false): String
 
     /**
       * @return Qualifier that should be use to select data from this 
accumulator.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
index 47035b9..735740f 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
@@ -42,7 +42,7 @@ private[apache] case class SingleTableSQLAccumulator(
     orderBy: Option[Seq[SortOrder]] = None
 ) extends SelectAccumulator {
     /** @inheritdoc */
-    override def compileQuery(prettyPrint: Boolean = false): String = {
+    override def compileQuery(prettyPrint: Boolean = false, nestedQuery: 
Boolean = false): String = {
         val delim = if (prettyPrint) "\n" else " "
         val tab = if (prettyPrint) "  " else ""
 
@@ -61,9 +61,13 @@ private[apache] case class SingleTableSQLAccumulator(
         if (orderBy.exists(_.nonEmpty))
             sql += s"${delim}ORDER BY 
${orderBy.get.map(exprToString(_)).mkString(s",$delim$tab")}"
 
-        if (limit.isDefined)
+        if (limit.isDefined) {
             sql += s" LIMIT ${limit.map(exprToString(_)).get}"
 
+            if (nestedQuery)
+                sql = s"SELECT * FROM ($sql)"
+        }
+
         sql
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
index 723e17a..5f870e3 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
@@ -18,7 +18,7 @@
 package org.apache.ignite.spark.impl.optimization.accumulator
 
 import org.apache.ignite.spark.impl.optimization.{IgniteQueryContext, 
exprToString, toAttributeReference}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, 
SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
NamedExpression, SortOrder}
 
 /**
   * Accumulator to store info about UNION query.
@@ -27,21 +27,32 @@ private[apache] case class UnionSQLAccumulator(
     igniteQueryContext: IgniteQueryContext,
     children: Seq[QueryAccumulator],
     outputExpressions: Seq[NamedExpression],
+    limit: Option[Expression] = None,
+    localLimit: Option[Expression] = None,
     orderBy: Option[Seq[SortOrder]] = None
 ) extends QueryAccumulator {
     /** @inheritdoc */
-    override def compileQuery(prettyPrint: Boolean = false): String = {
+    override def compileQuery(prettyPrint: Boolean = false, nestedQuery: 
Boolean = false): String = {
         val delim = if (prettyPrint) "\n" else " "
         val tab = if (prettyPrint) "  " else ""
 
-        val query = 
children.map(_.compileQuery(prettyPrint)).mkString(s"${delim}UNION$delim")
+        var query = children.map(_.compileQuery(prettyPrint, nestedQuery = 
true)).mkString(s"${delim}UNION$delim")
 
-        orderBy match {
+        query = orderBy match {
             case Some(sortOrders) ⇒
                 query + s"${delim}ORDER BY 
${sortOrders.map(exprToString(_)).mkString(s",$delim$tab")}"
 
             case None ⇒ query
         }
+
+        if (limit.isDefined) {
+            query += s" LIMIT ${exprToString(limit.get)}"
+
+            if (nestedQuery)
+                query = s"SELECT * FROM ($query)"
+        }
+
+        query
     }
 
     /** @inheritdoc */
@@ -60,4 +71,10 @@ private[apache] case class UnionSQLAccumulator(
 
     /** @inheritdoc */
     override lazy val qualifier: String = igniteQueryContext.uniqueTableAlias
+
+    /** @inheritdoc */
+    override def withLimit(limit: Expression): QueryAccumulator = copy(limit = 
Some(limit))
+
+    /** @inheritdoc */
+    override def withLocalLimit(localLimit: Expression): QueryAccumulator =  
copy(localLimit = Some(localLimit))
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
 
b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
index 4a0f791..2d97792 100644
--- 
a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
+++ 
b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
@@ -126,7 +126,7 @@ object IgniteOptimization extends Rule[LogicalPlan] with 
Logging {
                         if (acc.groupBy.isDefined) {
                             val tableAlias = 
acc.igniteQueryContext.uniqueTableAlias
 
-                            accumulator.SingleTableSQLAccumulator(
+                            SingleTableSQLAccumulator(
                                 igniteQueryContext = acc.igniteQueryContext,
                                 table = None,
                                 tableExpression = Some((acc, tableAlias)),
@@ -141,7 +141,7 @@ object IgniteOptimization extends Rule[LogicalPlan] with 
Logging {
                     case acc: QueryAccumulator ⇒
                         val tableAlias = 
acc.igniteQueryContext.uniqueTableAlias
 
-                        accumulator.SingleTableSQLAccumulator(
+                        SingleTableSQLAccumulator(
                             igniteQueryContext = acc.igniteQueryContext,
                             table = None,
                             tableExpression = Some((acc, tableAlias)),
@@ -156,6 +156,9 @@ object IgniteOptimization extends Rule[LogicalPlan] with 
Logging {
                     case acc: SelectAccumulator ⇒
                         acc.withLocalLimit(limit.limitExpr)
 
+                    case acc: QueryAccumulator ⇒
+                        acc.withLocalLimit(limit.limitExpr)
+
                     case _ ⇒
                         throw new IgniteException("stepSkipped == true but 
child is not SelectAccumulator")
                 }
@@ -165,6 +168,9 @@ object IgniteOptimization extends Rule[LogicalPlan] with 
Logging {
                     case acc: SelectAccumulator ⇒
                         acc.withLimit(limit.limitExpr)
 
+                    case acc: QueryAccumulator ⇒
+                        acc.withLimit(limit.limitExpr)
+
                     case _ ⇒
                         throw new IgniteException("stepSkipped == true but 
child is not SelectAccumulator")
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
index c5df901..b071008 100644
--- 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.spark
 
+import org.apache.ignite.cache.query.annotations.QuerySqlField
+import org.apache.ignite.configuration.CacheConfiguration
 import org.apache.ignite.spark.AbstractDataFrameSpec._
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types._
@@ -24,6 +26,8 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.apache.ignite.spark.IgniteDataFrameSettings._
 
+import scala.annotation.meta.field
+
 /**
   * Tests to check loading schema for Ignite data sources.
   */
@@ -33,6 +37,8 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec 
{
 
     var employeeDataFrame: DataFrame = _
 
+    var personWithAliasesDataFrame: DataFrame = _
+
     describe("Loading DataFrame schema for Ignite tables") {
         it("should successfully load DataFrame schema for a Ignite SQL Table") 
{
             personDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, 
f.nullable)) should equal (
@@ -52,9 +58,17 @@ class IgniteDataFrameSchemaSpec extends 
AbstractDataFrameSpec {
         it("should successfully load DataFrame data for a Ignite table 
configured throw java annotation") {
             employeeDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, 
f.nullable)) should equal (
                 Array(
-                    ("id", LongType, true),
-                    ("name", StringType, true),
-                    ("salary", FloatType, true))
+                    ("ID", LongType, true),
+                    ("NAME", StringType, true),
+                    ("SALARY", FloatType, true))
+            )
+        }
+
+        it("should use QueryEntity column aliases") {
+            personWithAliasesDataFrame.schema.fields.map(f ⇒ (f.name, 
f.dataType, f.nullable)) should equal (
+                Array(
+                    ("ID", LongType, true),
+                    ("PERSON_NAME", StringType, true))
             )
         }
     }
@@ -62,6 +76,16 @@ class IgniteDataFrameSchemaSpec extends 
AbstractDataFrameSpec {
     override protected def beforeAll(): Unit = {
         super.beforeAll()
 
+        client.getOrCreateCache(new CacheConfiguration[Long, 
JPersonWithAlias]()
+            .setName("P3")
+            .setIndexedTypes(classOf[Long], classOf[JPersonWithAlias]))
+
+        personWithAliasesDataFrame = spark.read
+            .format(FORMAT_IGNITE)
+            .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+            .option(OPTION_TABLE, classOf[JPersonWithAlias].getSimpleName)
+            .load()
+
         createPersonTable(client, DEFAULT_CACHE)
 
         createEmployeeCache(client, EMPLOYEE_CACHE_NAME)
@@ -82,4 +106,8 @@ class IgniteDataFrameSchemaSpec extends 
AbstractDataFrameSpec {
 
         employeeDataFrame.createOrReplaceTempView("employee")
     }
+
+    case class JPersonWithAlias(
+        @(QuerySqlField @field) id: Long,
+        @(QuerySqlField @field)(name = "person_name", index = true) name: 
String)
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93010800/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala
 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala
index ff367af..c2b5973 100644
--- 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala
@@ -17,12 +17,20 @@
 
 package org.apache.ignite.spark
 
+import org.apache.ignite.cache.query.annotations.QuerySqlField
+import org.apache.ignite.configuration.CacheConfiguration
 import org.apache.spark.sql.ignite.IgniteSparkSession
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.apache.ignite.internal.IgnitionEx
 import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
 import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, 
TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, 
OPTION_TABLE}
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.types.DataTypes.StringType
+import org.apache.spark.sql.{Dataset, Row}
+
+import scala.annotation.meta.field
 
 /**
   */
@@ -232,6 +240,25 @@ class IgniteOptimizationSpec extends AbstractDataFrameSpec 
{
 
             checkQueryData(df, data)
         }
+
+        it("Should optimize union") {
+            val union = readTable("JPerson").union(readTable("JPerson2"))
+
+            val data = (
+                (1, "JPerson-1"),
+                (2, "JPerson-2"))
+
+            checkQueryData(union, data)
+        }
+
+        it("Should optimize null column") {
+            val p = readTable("JPerson").withColumn("nullColumn", 
lit(null).cast(StringType))
+
+            val data = Tuple1(
+                (1, "JPerson-1", null))
+
+            checkQueryData(p, data)
+        }
     }
 
     describe("Not Optimized Queries") {
@@ -278,6 +305,13 @@ class IgniteOptimizationSpec extends AbstractDataFrameSpec 
{
         }
     }
 
+    def readTable(tblName: String): Dataset[Row] =
+        igniteSession.read
+            .format(FORMAT_IGNITE)
+            .option(OPTION_TABLE, tblName)
+            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, 
TEST_CONFIG_FILE)
+            .load
+
     override protected def beforeAll(): Unit = {
         super.beforeAll()
 
@@ -285,6 +319,20 @@ class IgniteOptimizationSpec extends AbstractDataFrameSpec 
{
 
         createCityTable(client, DEFAULT_CACHE)
 
+        val p = client.getOrCreateCache(new CacheConfiguration[Long, JPerson]()
+            .setName("P")
+            .setSqlSchema("SQL_PUBLIC")
+            .setIndexedTypes(classOf[Long], classOf[JPerson]))
+
+        p.put(1L, new JPerson(1L, "JPerson-1"))
+
+        val p2 = client.getOrCreateCache(new CacheConfiguration[Long, 
JPerson2]()
+            .setName("P2")
+            .setSqlSchema("SQL_PUBLIC")
+            .setIndexedTypes(classOf[Long], classOf[JPerson2]))
+
+        p2.put(1L, new JPerson2(2L, "JPerson-2"))
+
         val configProvider = enclose(null) (x ⇒ () ⇒ {
             val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
 
@@ -302,4 +350,12 @@ class IgniteOptimizationSpec extends AbstractDataFrameSpec 
{
 
         igniteSession.udf.register("test_reverse", (str: String) ⇒ 
str.reverse)
     }
+
+    case class JPerson(
+        @(QuerySqlField @field) id: Long,
+        @(QuerySqlField @field)(index = true) name: String)
+
+    case class JPerson2(
+        @(QuerySqlField @field) id: Long,
+        @(QuerySqlField @field)(index = true) name: String)
 }

Reply via email to