Repository: spark Updated Branches: refs/heads/master 3e2864e40 -> e1139dd60
[SPARK-3237][SQL] Fix parquet filters with UDFs Author: Michael Armbrust <mich...@databricks.com> Closes #2153 from marmbrus/parquetFilters and squashes the following commits: 712731a [Michael Armbrust] Use closure serializer for sending filters. 1e83f80 [Michael Armbrust] Clean udf functions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1139dd6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1139dd6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1139dd6 Branch: refs/heads/master Commit: e1139dd60e0692e8adb1337c1f605165ce4b8895 Parents: 3e2864e Author: Michael Armbrust <mich...@databricks.com> Authored: Wed Aug 27 00:59:23 2014 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Aug 27 00:59:23 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala | 4 ++++ .../scala/org/apache/spark/sql/parquet/ParquetFilters.scala | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e1139dd6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala index 63ac2a6..0b3c1df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala @@ -18,10 +18,14 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.util.ClosureCleaner case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression]) extends Expression { + // Clean function when not called with default no-arg constructor. + if (function != null) { ClosureCleaner.clean(function) } + type EvaluatedType = Any def nullable = true http://git-wip-us.apache.org/repos/asf/spark/blob/e1139dd6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 2298a9b..fe28e0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.parquet +import java.nio.ByteBuffer + import org.apache.hadoop.conf.Configuration import parquet.filter._ @@ -25,6 +27,7 @@ import parquet.column.ColumnReader import com.google.common.io.BaseEncoding +import org.apache.spark.SparkEnv import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate} import org.apache.spark.sql.catalyst.expressions._ @@ -237,7 +240,8 @@ object ParquetFilters { */ def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = { if (filters.length > 0) { - val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters) + val serialized: Array[Byte] = + SparkEnv.get.closureSerializer.newInstance().serialize(filters).array() val encoded: String = BaseEncoding.base64().encode(serialized) conf.set(PARQUET_FILTER_DATA, encoded) } @@ -252,7 +256,7 @@ object ParquetFilters { val data = conf.get(PARQUET_FILTER_DATA) if (data != null) { val decoded: Array[Byte] = BaseEncoding.base64().decode(data) - SparkSqlSerializer.deserialize(decoded) + SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded)) } else { Seq() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org