Repository: spark
Updated Branches:
  refs/heads/master 25b5b867d -> b734ed0c2


[SPARK-3395] [SQL] DSL sometimes incorrectly reuses attribute ids, breaking 
queries

This resolves https://issues.apache.org/jira/browse/SPARK-3395

Author: Eric Liang <e...@google.com>

Closes #2266 from ericl/spark-3395 and squashes the following commits:

7f2b6f0 [Eric Liang] add regression test
05bd1e4 [Eric Liang] in the dsl, create a new schema instance in each 
applySchema


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b734ed0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b734ed0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b734ed0c

Branch: refs/heads/master
Commit: b734ed0c229373dbc589b9eca7327537ca458138
Parents: 25b5b86
Author: Eric Liang <e...@google.com>
Authored: Tue Sep 9 23:47:12 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Tue Sep 9 23:47:12 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/SchemaRDD.scala   |  3 ++-
 .../scala/org/apache/spark/sql/DslQuerySuite.scala    | 14 ++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b734ed0c/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 33b2ed1..d2ceb4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -428,7 +428,8 @@ class SchemaRDD(
    */
   private def applySchema(rdd: RDD[Row]): SchemaRDD = {
     new SchemaRDD(sqlContext,
-      SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, 
rdd))(sqlContext))
+      SparkLogicalPlan(
+        ExistingRdd(queryExecution.analyzed.output.map(_.newInstance), 
rdd))(sqlContext))
   }
 
   // =======================================================================

http://git-wip-us.apache.org/repos/asf/spark/blob/b734ed0c/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index 1a6a6c1..d001abb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.test._
 
 /* Implicits */
@@ -133,6 +135,18 @@ class DslQuerySuite extends QueryTest {
       mapData.take(1).toSeq)
   }
 
+  test("SPARK-3395 limit distinct") {
+    val filtered = TestData.testData2
+      .distinct()
+      .orderBy(SortOrder('a, Ascending), SortOrder('b, Ascending))
+      .limit(1)
+      .registerTempTable("onerow")
+    checkAnswer(
+      sql("select * from onerow inner join testData2 on onerow.a = 
testData2.a"),
+      (1, 1, 1, 1) ::
+      (1, 1, 1, 2) :: Nil)
+  }
+
   test("average") {
     checkAnswer(
       testData2.groupBy()(avg('a)),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to