Repository: spark
Updated Branches:
  refs/heads/master 7f7eb3934 -> 07d9c5327


[SPARK-9876][SQL][FOLLOWUP] Enable string and binary tests for Parquet 
predicate pushdown and replace deprecated fromByteArray.

## What changes were proposed in this pull request?

It seems Parquet has been upgraded to 1.8.1 by 
https://github.com/apache/spark/pull/13280. So,  this PR enables string and 
binary predicate push down which was disabled due to 
[SPARK-11153](https://issues.apache.org/jira/browse/SPARK-11153) and 
[PARQUET-251](https://issues.apache.org/jira/browse/PARQUET-251) and cleans up 
some comments unremoved (I think by mistake).

This PR also replace the API, `fromByteArray()` deprecated in 
[PARQUET-251](https://issues.apache.org/jira/browse/PARQUET-251).

## How was this patch tested?

Unit tests in `ParquetFilters`

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #13389 from HyukjinKwon/parquet-1.8-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07d9c532
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07d9c532
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07d9c532

Branch: refs/heads/master
Commit: 07d9c5327f050f9da611d5239f61ed73b36ce4e6
Parents: 7f7eb39
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Tue Jul 5 16:59:40 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Jul 5 16:59:40 2016 +0800

----------------------------------------------------------------------
 .../datasources/parquet/VectorizedPlainValuesReader.java |  2 +-
 .../datasources/parquet/CatalystWriteSupport.scala       | 11 ++++++-----
 .../execution/datasources/parquet/ParquetFilters.scala   |  8 --------
 .../datasources/parquet/ParquetFilterSuite.scala         |  6 ++----
 4 files changed, 9 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07d9c532/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 9def455..98018b7 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -170,7 +170,7 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
 
   @Override
   public final Binary readBinary(int len) {
-    Binary result = Binary.fromByteArray(buffer, offset - 
Platform.BYTE_ARRAY_OFFSET, len);
+    Binary result = Binary.fromConstantByteArray(buffer, offset - 
Platform.BYTE_ARRAY_OFFSET, len);
     offset += len;
     return result;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/07d9c532/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
index cf974af..00e1bca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -150,7 +150,8 @@ private[parquet] class CatalystWriteSupport extends 
WriteSupport[InternalRow] wi
 
       case StringType =>
         (row: SpecializedGetters, ordinal: Int) =>
-          
recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes))
+          recordConsumer.addBinary(
+            Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
 
       case TimestampType =>
         (row: SpecializedGetters, ordinal: Int) => {
@@ -165,12 +166,12 @@ private[parquet] class CatalystWriteSupport extends 
WriteSupport[InternalRow] wi
           val (julianDay, timeOfDayNanos) = 
DateTimeUtils.toJulianDay(row.getLong(ordinal))
           val buf = ByteBuffer.wrap(timestampBuffer)
           
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
-          recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer))
+          recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
         }
 
       case BinaryType =>
         (row: SpecializedGetters, ordinal: Int) =>
-          
recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))
+          
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))
 
       case DecimalType.Fixed(precision, scale) =>
         makeDecimalWriter(precision, scale)
@@ -227,7 +228,7 @@ private[parquet] class CatalystWriteSupport extends 
WriteSupport[InternalRow] wi
           shift -= 8
         }
 
-        recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, 
numBytes))
+        recordConsumer.addBinary(Binary.fromReusedByteArray(decimalBuffer, 0, 
numBytes))
       }
 
     val binaryWriterUsingUnscaledBytes =
@@ -248,7 +249,7 @@ private[parquet] class CatalystWriteSupport extends 
WriteSupport[InternalRow] wi
           decimalBuffer
         }
 
-        recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, 
numBytes))
+        recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLengthBytes, 
0, numBytes))
       }
 
     writeLegacyParquetFormat match {

http://git-wip-us.apache.org/repos/asf/spark/blob/07d9c532/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 6240812..7213a38 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -50,7 +50,6 @@ private[sql] object ParquetFilters {
       (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-
     // Binary.fromString and Binary.fromByteArray don't accept null values
     case StringType =>
       (n: String, v: Any) => FilterApi.eq(
@@ -73,7 +72,6 @@ private[sql] object ParquetFilters {
       (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-
     case StringType =>
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
@@ -93,7 +91,6 @@ private[sql] object ParquetFilters {
       (n: String, v: Any) => FilterApi.lt(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.lt(binaryColumn(n),
@@ -112,7 +109,6 @@ private[sql] object ParquetFilters {
       (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.ltEq(binaryColumn(n),
@@ -131,8 +127,6 @@ private[sql] object ParquetFilters {
       (n: String, v: Any) => FilterApi.gt(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.gt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-
-    // See https://issues.apache.org/jira/browse/SPARK-11153
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.gt(binaryColumn(n),
@@ -151,7 +145,6 @@ private[sql] object ParquetFilters {
       (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
-
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.gtEq(binaryColumn(n),
@@ -174,7 +167,6 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Set[Any]) =>
         FilterApi.userDefined(doubleColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
-
     case StringType =>
       (n: String, v: Set[Any]) =>
         FilterApi.userDefined(binaryColumn(n),

http://git-wip-us.apache.org/repos/asf/spark/blob/07d9c532/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 45fd6a5..2a5666e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -229,8 +229,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
     }
   }
 
-  // See https://issues.apache.org/jira/browse/SPARK-11153
-  ignore("filter pushdown - string") {
+  test("filter pushdown - string") {
     withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df 
=>
       checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
       checkFilterPredicate(
@@ -258,8 +257,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
     }
   }
 
-  // See https://issues.apache.org/jira/browse/SPARK-11153
-  ignore("filter pushdown - binary") {
+  test("filter pushdown - binary") {
     implicit class IntToBinary(int: Int) {
       def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to