Repository: spark
Updated Branches:
  refs/heads/master 3e2864e40 -> e1139dd60


[SPARK-3237][SQL] Fix parquet filters with UDFs

Author: Michael Armbrust <mich...@databricks.com>

Closes #2153 from marmbrus/parquetFilters and squashes the following commits:

712731a [Michael Armbrust] Use closure serializer for sending filters.
1e83f80 [Michael Armbrust] Clean udf functions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1139dd6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1139dd6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1139dd6

Branch: refs/heads/master
Commit: e1139dd60e0692e8adb1337c1f605165ce4b8895
Parents: 3e2864e
Author: Michael Armbrust <mich...@databricks.com>
Authored: Wed Aug 27 00:59:23 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Aug 27 00:59:23 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala | 4 ++++
 .../scala/org/apache/spark/sql/parquet/ParquetFilters.scala  | 8 ++++++--
 2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1139dd6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index 63ac2a6..0b3c1df 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -18,10 +18,14 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.types.DataType
+import org.apache.spark.util.ClosureCleaner
 
 case class ScalaUdf(function: AnyRef, dataType: DataType, children: 
Seq[Expression])
   extends Expression {
 
+  // Clean function when not called with default no-arg constructor.
+  if (function != null) { ClosureCleaner.clean(function) }
+
   type EvaluatedType = Any
 
   def nullable = true

http://git-wip-us.apache.org/repos/asf/spark/blob/e1139dd6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 2298a9b..fe28e0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.parquet
 
+import java.nio.ByteBuffer
+
 import org.apache.hadoop.conf.Configuration
 
 import parquet.filter._
@@ -25,6 +27,7 @@ import parquet.column.ColumnReader
 
 import com.google.common.io.BaseEncoding
 
+import org.apache.spark.SparkEnv
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.expressions.{Predicate => 
CatalystPredicate}
 import org.apache.spark.sql.catalyst.expressions._
@@ -237,7 +240,8 @@ object ParquetFilters {
    */
   def serializeFilterExpressions(filters: Seq[Expression], conf: 
Configuration): Unit = {
     if (filters.length > 0) {
-      val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters)
+      val serialized: Array[Byte] =
+        SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
       val encoded: String = BaseEncoding.base64().encode(serialized)
       conf.set(PARQUET_FILTER_DATA, encoded)
     }
@@ -252,7 +256,7 @@ object ParquetFilters {
     val data = conf.get(PARQUET_FILTER_DATA)
     if (data != null) {
       val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
-      SparkSqlSerializer.deserialize(decoded)
+      
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
     } else {
       Seq()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to