Repository: spark
Updated Branches:
  refs/heads/branch-2.1 d995dac1c -> 3ecef2491


Revert "[SPARK-21258][SQL] Fix WindowExec complex object aggregation with 
spilling"

This reverts commit d995dac1cdeec940364453675f59ce5cf2b53684.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ecef249
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ecef249
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ecef249

Branch: refs/heads/branch-2.1
Commit: 3ecef249184cdcf74765070d3ea39cf180214976
Parents: d995dac
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Jun 30 14:45:55 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Jun 30 14:45:55 2017 +0800

----------------------------------------------------------------------
 .../execution/window/AggregateProcessor.scala   |  7 +--
 .../sql/DataFrameWindowFunctionsSuite.scala     | 47 +-------------------
 2 files changed, 3 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3ecef249/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index dfa1100..c9f5d3b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -145,13 +145,10 @@ private[window] final class AggregateProcessor(
 
   /** Update the buffer. */
   def update(input: InternalRow): Unit = {
-    // TODO(hvanhovell) this sacrifices performance for correctness. We should 
make sure that
-    // MutableProjection makes copies of the complex input objects it buffer.
-    val copy = input.copy()
-    updateProjection(join(buffer, copy))
+    updateProjection(join(buffer, input))
     var i = 0
     while (i < numImperatives) {
-      imperatives(i).update(buffer, copy)
+      imperatives(i).update(buffer, input)
       i += 1
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3ecef249/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 204858f..1255c49 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -19,9 +19,8 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction, Window}
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{DataType, LongType, StructType}
 
 /**
  * Window function testing for DataFrame API.
@@ -424,48 +423,4 @@ class DataFrameWindowFunctionsSuite extends QueryTest with 
SharedSQLContext {
       df.select(selectList: _*).where($"value" < 2),
       Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0)))
   }
-
-  test("SPARK-21258: complex object in combination with spilling") {
-    // Make sure we trigger the spilling path.
-    withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") {
-      val sampleSchema = new StructType().
-        add("f0", StringType).
-        add("f1", LongType).
-        add("f2", ArrayType(new StructType().
-          add("f20", StringType))).
-        add("f3", ArrayType(new StructType().
-          add("f30", StringType)))
-
-      val w0 = Window.partitionBy("f0").orderBy("f1")
-      val w1 = w0.rowsBetween(Long.MinValue, Long.MaxValue)
-
-      val c0 = first(struct($"f2", $"f3")).over(w0) as "c0"
-      val c1 = last(struct($"f2", $"f3")).over(w1) as "c1"
-
-      val input =
-        
"""{"f1":1497820153720,"f2":[{"f20":"x","f21":0}],"f3":[{"f30":"x","f31":0}]}
-          |{"f1":1497802179638}
-          |{"f1":1497802189347}
-          |{"f1":1497802189593}
-          |{"f1":1497802189597}
-          |{"f1":1497802189599}
-          |{"f1":1497802192103}
-          |{"f1":1497802193414}
-          |{"f1":1497802193577}
-          |{"f1":1497802193709}
-          |{"f1":1497802202883}
-          |{"f1":1497802203006}
-          |{"f1":1497802203743}
-          |{"f1":1497802203834}
-          |{"f1":1497802203887}
-          |{"f1":1497802203893}
-          |{"f1":1497802203976}
-          |{"f1":1497820168098}
-          |""".stripMargin.split("\n").toSeq
-
-      import testImplicits._
-
-      spark.read.schema(sampleSchema).json(input.toDS()).select(c0, 
c1).foreach { _ => () }
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to