[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21556


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202448032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+pushDownDate: Boolean,
+pushDownDecimal: Boolean,
+pushDownStartWith: Boolean) {
 
   private case class ParquetSchemaType(
   originalType: OriginalType,
   primitiveTypeName: PrimitiveTypeName,
-  decimalMetadata: DecimalMetadata)
-
-  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
-  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
-  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
-  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
-  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
-  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
-  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
-  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
-  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
-  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+  length: Int,
+  decimalMeta: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, 
null)
+  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
+  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
 
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
+  private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
+
+  private def decimalToInt64(decimal: JBigDecimal): JLong = 
decimal.unscaledValue().longValue()
+
+  private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): 
Binary = {
+val decimalBuffer = new Array[Byte](numBytes)
+val bytes = decimal.unscaledValue().toByteArray
+
+val fixedLengthBytes = if (bytes.length == numBytes) {
+  bytes
+} else {
+  val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+  java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, 
signByte)
+  System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
+  decimalBuffer
+}
+Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
+  }
+
   private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
--- End diff --

Sounds good. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202447544
  
--- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt ---
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized3785 / 3867  4.2 
240.6   1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928  4.1 
242.9   1.0X
-Native ORC Vectorized 3981 / 4049  4.0 
253.1   1.0X
-Native ORC Vectorized (Pushdown)   702 /  735 22.4 
 44.6   5.4X
+Parquet Vectorized4407 / 4852  3.6 
280.2   1.0X
+Parquet Vectorized (Pushdown) 1602 / 1634  9.8 
101.8   2.8X
--- End diff --

Okay, I see. The tenths and hundredths are always 0, which makes the 
precision-8 numbers actually precision-10. It is still odd that this is causing 
Parquet to have no stats, but I'm happy with the fix. Thanks for explaining.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-13 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202327362
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+pushDownDate: Boolean,
+pushDownDecimal: Boolean,
+pushDownStartWith: Boolean) {
 
   private case class ParquetSchemaType(
   originalType: OriginalType,
   primitiveTypeName: PrimitiveTypeName,
-  decimalMetadata: DecimalMetadata)
-
-  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
-  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
-  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
-  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
-  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
-  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
-  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
-  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
-  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
-  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+  length: Int,
+  decimalMeta: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, 
null)
+  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
+  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
 
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
+  private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
+
+  private def decimalToInt64(decimal: JBigDecimal): JLong = 
decimal.unscaledValue().longValue()
+
+  private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): 
Binary = {
+val decimalBuffer = new Array[Byte](numBytes)
+val bytes = decimal.unscaledValue().toByteArray
+
+val fixedLengthBytes = if (bytes.length == numBytes) {
+  bytes
+} else {
+  val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+  java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, 
signByte)
+  System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
+  decimalBuffer
+}
+Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
+  }
+
   private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
--- End diff --

`ParquetBooleanType`, `ParquetLongType`, `ParquetFloatType` and 
`ParquetDoubleType` do not need `Option`. Here is a example:
```scala
scala> import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.Binary

scala> Option(null).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull
res7: org.apache.parquet.io.api.Binary = null

scala> Binary.fromString(null.asInstanceOf[String])
java.lang.NullPointerException
  at 
org.apache.parquet.io.api.Binary$FromStringBinary.encodeUTF8(Binary.java:224)
  at 
org.apache.parquet.io.api.Binary$FromStringBinary.(Binary.java:214)
  at org.apache.parquet.io.api.Binary.fromString(Binary.java:554)
  ... 52 elided

scala> null.asInstanceOf[java.lang.Long]
res9: Long = null

scala> null.asInstanceOf[java.lang.Boolean]
res10: Boolean = null

scala> 
Option(null).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull
res11: Integer = null

scala> null.asInstanceOf[Number].intValue.asInstanceOf[Integer]
java.lang.NullPointerException
  ... 52 elided
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202240358
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Decimal type must make sure that filter value's scale matched the 
file.
+// If doesn't matched, which would cause data corruption.
+// Other types must make sure that filter value's type matched the 
file.
--- End diff --

I would say like .. Parquet's type in the given file should be matched to 
the value's type in the pushed filter in order to push down the filter to 
Parquet.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202239380
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Decimal type must make sure that filter value's scale matched the 
file.
--- End diff --

Shall we leave this comment around the decimal `case`s below or around 
`isDecimalMatched`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202214356
  
--- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt ---
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized3785 / 3867  4.2 
240.6   1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928  4.1 
242.9   1.0X
-Native ORC Vectorized 3981 / 4049  4.0 
253.1   1.0X
-Native ORC Vectorized (Pushdown)   702 /  735 22.4 
 44.6   5.4X
+Parquet Vectorized4407 / 4852  3.6 
280.2   1.0X
+Parquet Vectorized (Pushdown) 1602 / 1634  9.8 
101.8   2.8X
--- End diff --

Here is a test:
```scala
// decimal(9, 2) max values is 999.99
// 1024 * 1024 * 15 =  15728640
val path = "/tmp/spark/parquet"
spark.range(1024 * 1024 * 15).selectExpr("cast((id) as decimal(9, 2)) as 
id").orderBy("id").write.mode("overwrite").parquet(path)
```
The generated parquet metadata:
```shell
$ java -jar ./parquet-tools/target/parquet-tools-1.10.1-SNAPSHOT.jar meta  
/tmp/spark/parquet
file:
file:/tmp/spark/parquet/part-0-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet
 
creator: parquet-mr version 1.10.0 (build 
031a6654009e3b82020012a18434c582bd74c73a) 
extra:   org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]}
 

file schema: spark_schema 


id:  OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:5728640 TS:36 OFFSET:4 


id:   INT32 SNAPPY DO:0 FPO:4 SZ:38/36/0.95 VC:5728640 
ENC:PLAIN,BIT_PACKED,RLE ST:[no stats for this column]
file:
file:/tmp/spark/parquet/part-1-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet
 
creator: parquet-mr version 1.10.0 (build 
031a6654009e3b82020012a18434c582bd74c73a) 
extra:   org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]}
 

file schema: spark_schema 


id:  OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:651016 TS:2604209 OFFSET:4 


id:   INT32 SNAPPY DO:0 FPO:4 SZ:2604325/2604209/1.00 VC:651016 
ENC:PLAIN,BIT_PACKED,RLE ST:[min: 0.00, max: 651015.00, num_nulls: 0]
file:
file:/tmp/spark/parquet/part-2-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet
 
creator: parquet-mr version 1.10.0 (build 
031a6654009e3b82020012a18434c582bd74c73a) 
extra:   org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]}
 

file schema: spark_schema 


id:  OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:3231146 TS:12925219 OFFSET:4 


id:   INT32 SNAPPY DO:0 FPO:4 SZ:12925864/12925219/1.00 VC:3231146 
ENC:PLAIN,BIT_PACKED,RLE ST:[min: 651016.00, max: 3882161.00, num_nulls: 0]
file:
file:/tmp/spark/parquet/part-3-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet
 
creator: parquet-mr version 1.10.0 (build 
031a6654009e3b82020012a18434c582bd74c73a) 
extra:   org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"id","type":"decimal(9,2)","nullable":true,"metadata":{}}]}
 

file schema: spark_schema 


id:  OPTIONAL INT32 O:DECIMAL R:0 D:1

row group 1: RC:2887956 TS:11552408 OFFSET:4 


id:   INT32 SNAPPY DO:0 FPO:4 SZ:11552986/11552408/1.00 VC:2887956 
ENC:PLAIN,BIT_PACKED,RLE ST:[min: 3882162.00, max: 6770117.00, num_nulls: 0]
file:
file:/tmp/spark/parquet/part-4-26b38556-494a-4b89-923e-69ea73365488-c000.snappy.parquet
 
creator: 

[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202093955
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
+case ParquetSchemaType(DECIMAL, INT32, 0, decimalMeta) =>
--- End diff --

Have you tried not using `|` and ignoring the physical type with `_`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202093665
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
--- End diff --

Not in this PR that adds Decimal support. We should consider it in the 
future, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202093508
  
--- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt ---
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized3785 / 3867  4.2 
240.6   1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928  4.1 
242.9   1.0X
-Native ORC Vectorized 3981 / 4049  4.0 
253.1   1.0X
-Native ORC Vectorized (Pushdown)   702 /  735 22.4 
 44.6   5.4X
+Parquet Vectorized4407 / 4852  3.6 
280.2   1.0X
+Parquet Vectorized (Pushdown) 1602 / 1634  9.8 
101.8   2.8X
--- End diff --

I'm not sure I understand. That's less than 2^24, so it should fit in an 
int. It should also fit in 8 base-ten digits so decimal(9,2) should work. And 
last, if the values don't fit in an int, I'm not sure how we would be able to 
store them in the first place, regardless of how stats are handled.

Did you verify that there are no stats for the file produced here? If 
that's the case, it would make sense with these numbers. I think we just need 
to look for a different reason why stats are missing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202090024
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -248,29 +371,29 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
 // Probably I missed something and obviously this should be changed.
 
 predicate match {
-  case sources.IsNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNull(name) if canMakeFilterOn(name, null) =>
 makeEq.lift(nameToType(name)).map(_(name, null))
-  case sources.IsNotNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
 makeNotEq.lift(nameToType(name)).map(_(name, null))
 
-  case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
 
-  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, 
value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
--- End diff --

Maybe I'm missing something, but that returns true for all null values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202075842
  
--- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt ---
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized3785 / 3867  4.2 
240.6   1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928  4.1 
242.9   1.0X
-Native ORC Vectorized 3981 / 4049  4.0 
253.1   1.0X
-Native ORC Vectorized (Pushdown)   702 /  735 22.4 
 44.6   5.4X
+Parquet Vectorized4407 / 4852  3.6 
280.2   1.0X
+Parquet Vectorized (Pushdown) 1602 / 1634  9.8 
101.8   2.8X
--- End diff --

Because `1024 * 1024 * 15` is out of `decimal(9, 2)` range. I will update 
benchmark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201928990
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -248,29 +371,29 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
 // Probably I missed something and obviously this should be changed.
 
 predicate match {
-  case sources.IsNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNull(name) if canMakeFilterOn(name, null) =>
 makeEq.lift(nameToType(name)).map(_(name, null))
-  case sources.IsNotNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
 makeNotEq.lift(nameToType(name)).map(_(name, null))
 
-  case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
 
-  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, 
value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
--- End diff --

I handled null values at `valueCanMakeFilterOn`:
```scala
def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
  value == null || (nameToType(name) match {
case ParquetBooleanType => value.isInstanceOf[JBoolean]
case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
case ParquetLongType => value.isInstanceOf[JLong]
case ParquetFloatType => value.isInstanceOf[JFloat]
case ParquetDoubleType => value.isInstanceOf[JDouble]
case ParquetStringType => value.isInstanceOf[String]
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
case ParquetDateType => value.isInstanceOf[Date]
case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
  isDecimalMatched(value, decimalMeta)
case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
  isDecimalMatched(value, decimalMeta)
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) =>
  isDecimalMatched(value, decimalMeta)
case _ => false
  })
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201927646
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -202,6 +283,16 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
 case ParquetDateType if pushDownDate =>
   (n: String, v: Any) =>
 FilterApi.gtEq(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
+
+case ParquetSchemaType(DECIMAL, INT32, 0, _) if pushDownDecimal =>
--- End diff --

In fact, the length is always 0, I replaced it to `_`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201925142
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
--- End diff --

Originally it is not supported. Do we need to support it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201924695
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
+case ParquetSchemaType(DECIMAL, INT32, 0, decimalMeta) =>
--- End diff --

No.

![image](https://user-images.githubusercontent.com/5399861/42616795-0c40b9e8-85e2-11e8-9697-ab5572b66bab.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201763831
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -248,29 +371,29 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
 // Probably I missed something and obviously this should be changed.
 
 predicate match {
-  case sources.IsNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNull(name) if canMakeFilterOn(name, null) =>
 makeEq.lift(nameToType(name)).map(_(name, null))
-  case sources.IsNotNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
 makeNotEq.lift(nameToType(name)).map(_(name, null))
 
-  case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
 
-  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, 
value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
--- End diff --

Since makeNotEq is also used for EqualNullSafe, I think it should handle 
null values as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201763489
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+pushDownDate: Boolean,
+pushDownDecimal: Boolean,
+pushDownStartWith: Boolean) {
 
   private case class ParquetSchemaType(
   originalType: OriginalType,
   primitiveTypeName: PrimitiveTypeName,
-  decimalMetadata: DecimalMetadata)
-
-  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
-  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
-  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
-  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
-  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
-  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
-  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
-  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
-  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
-  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+  length: Int,
+  decimalMeta: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, 
null)
+  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
+  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
 
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
+  private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
+
+  private def decimalToInt64(decimal: JBigDecimal): JLong = 
decimal.unscaledValue().longValue()
+
+  private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): 
Binary = {
+val decimalBuffer = new Array[Byte](numBytes)
+val bytes = decimal.unscaledValue().toByteArray
+
+val fixedLengthBytes = if (bytes.length == numBytes) {
+  bytes
+} else {
+  val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+  java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, 
signByte)
+  System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
+  decimalBuffer
+}
+Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
+  }
+
   private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
--- End diff --

Since `makeEq` is called for `EqualsNullSafe` and `valueCanMakeFilterOn` 
allows null values through, I think these could be null, like the String case. 
I think this should use the `Option` pattern from String for all values, unless 
I'm missing some reason why these will never be null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201761849
  
--- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt ---
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized3785 / 3867  4.2 
240.6   1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928  4.1 
242.9   1.0X
-Native ORC Vectorized 3981 / 4049  4.0 
253.1   1.0X
-Native ORC Vectorized (Pushdown)   702 /  735 22.4 
 44.6   5.4X
+Parquet Vectorized4407 / 4852  3.6 
280.2   1.0X
+Parquet Vectorized (Pushdown) 1602 / 1634  9.8 
101.8   2.8X
--- End diff --

Maybe it is that the data is more dense, so we need to read more values in 
the row group that contains the one we're looking for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201756667
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+pushDownDate: Boolean,
+pushDownDecimal: Boolean,
+pushDownStartWith: Boolean) {
 
   private case class ParquetSchemaType(
   originalType: OriginalType,
   primitiveTypeName: PrimitiveTypeName,
-  decimalMetadata: DecimalMetadata)
-
-  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
-  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
-  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
-  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
-  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
-  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
-  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
-  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
-  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
-  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+  length: Int,
+  decimalMeta: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, 
null)
+  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
+  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
 
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
+  private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
+
+  private def decimalToInt64(decimal: JBigDecimal): JLong = 
decimal.unscaledValue().longValue()
+
+  private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): 
Binary = {
+val decimalBuffer = new Array[Byte](numBytes)
+val bytes = decimal.unscaledValue().toByteArray
+
+val fixedLengthBytes = if (bytes.length == numBytes) {
+  bytes
+} else {
+  val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+  java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, 
signByte)
+  System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
+  decimalBuffer
+}
+Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
--- End diff --

This byte array is not reused, it is allocated each time this function 
runs. This should use the `fromConstantByteArray` variant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201755545
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
--- End diff --

Why is there no support for timestamp?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201755353
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
+case ParquetSchemaType(DECIMAL, INT32, 0, decimalMeta) =>
--- End diff --

Can the decimal cases be collapsed to a single case on 
`ParquetSchemaType(DECIMAL, _, _, decimalMetadata)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201754882
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
--- End diff --

Is this issue reference correct? The PR says this is for SPARK-24549.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201754998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -202,6 +283,16 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
 case ParquetDateType if pushDownDate =>
   (n: String, v: Any) =>
 FilterApi.gtEq(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
+
+case ParquetSchemaType(DECIMAL, INT32, 0, _) if pushDownDecimal =>
--- End diff --

Why match 0 instead of _?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-07 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200813391
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
--- End diff --

Add check method to `canMakeFilterOn` and add a test case:
```scala
val decimal = new JBigDecimal(10).setScale(scale)
assert(decimal.scale() === scale)
assertResult(Some(lt(intColumn("cdecimal1"), 1000: Integer))) {
  parquetFilters.createFilter(parquetSchema, 
sources.LessThan("cdecimal1", decimal))
}

val decimal1 = new JBigDecimal(10).setScale(scale + 1)
assert(decimal1.scale() === scale + 1)

assertResult(None) {
  parquetFilters.createFilter(parquetSchema, 
sources.LessThan("cdecimal1", decimal1))
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-05 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200526464
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
--- End diff --

I see. I will do it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200419939
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
--- End diff --

That doesn't validate the value against the decimal scale from the file, 
which is what I'm suggesting. The decimal scale must match exactly and this is 
a good place to check because this has the file information. If the scale 
doesn't match, then the schema used to read this file is incorrect, which would 
cause data corruption.

In my opinion, it is better to add a check if it is cheap instead of 
debating whether or not some other part of the code covers the case. If this 
were happening per record then I would opt for a different strategy, but 
because this is at the file level it is a good idea to add it here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-05 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200275960
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -37,26 +39,50 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+pushDownDate: Boolean,
+pushDownDecimal: Boolean,
+pushDownStartWith: Boolean) {
 
   private case class ParquetSchemaType(
   originalType: OriginalType,
   primitiveTypeName: PrimitiveTypeName,
-  decimalMetadata: DecimalMetadata)
--- End diff --

Don't need `DecimalMetadata`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-05 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200246162
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
--- End diff --

Seems invalidate value already filtered by: 
https://github.com/apache/spark/blob/e76b0124fbe463def00b1dffcfd8fd47e04772fe/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L439


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200181894
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
--- End diff --

Since this uses the file schema, I think it should validate that the file 
uses the same scale as the value passed in. That's a cheap sanity check to 
ensure correctness.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200181749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
+  (n: String, v: Any) => FilterApi.eq(
+intColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
+  .asInstanceOf[Integer]).orNull)
+case ParquetSchemaType(DECIMAL, INT64, decimal) if pushDownDecimal =>
+  (n: String, v: Any) => FilterApi.eq(
+longColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
+  .asInstanceOf[java.lang.Long]).orNull)
+// Legacy DecimalType
+case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal) if 
pushDownDecimal &&
--- End diff --

The binary used for the legacy type and for fixed-length storage should be 
the same, so I don't understand why there are two different conversion methods. 
Also, because this is using the Parquet schema now, there's no need to base the 
length of this binary on what older versions of Spark did -- in other words, if 
the underlying Parquet type is fixed, then just convert the decimal to that 
size fixed without worrying about legacy types.

I think this should pass in the fixed array's length and convert the 
BigDecimal value to that length array for all cases. That works no matter what 
the file contains.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-04 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200170975
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
--- End diff --

DecimalType contains variable: `decimalMetadata`. It seems difficult to 
make a constants like before. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-02 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r199442189
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -359,6 +369,70 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
 }
   }
 
+  test("filter pushdown - decimal") {
+Seq(true, false).foreach { legacyFormat =>
+  withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> 
legacyFormat.toString) {
+Seq(s"_1 decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 
32BitDecimalType
+  s"_1 decimal(${Decimal.MAX_LONG_DIGITS}, 2)",  // 
64BitDecimalType
+  "_1 decimal(38, 18)"   // 
ByteArrayDecimalType
+).foreach { schemaDDL =>
+  val schema = StructType.fromDDL(schemaDDL)
+  val rdd =
+spark.sparkContext.parallelize((1 to 4).map(i => Row(new 
java.math.BigDecimal(i
+  val dataFrame = spark.createDataFrame(rdd, schema)
+  testDecimalPushDown(dataFrame) { implicit df =>
+assert(df.schema === schema)
+checkFilterPredicate('_1.isNull, classOf[Eq[_]], 
Seq.empty[Row])
+checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+
+checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
+checkFilterPredicate('_1 =!= 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
+checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+checkFilterPredicate('_1 < 2 || '_1 > 3, 
classOf[Operators.Or], Seq(Row(1), Row(4)))
+  }
+}
+  }
+}
+  }
+
+  test("incompatible parquet file format will throw exeception") {
--- End diff --

Have create a PR: https://github.com/apache/spark/pull/21696
After this PR. Support decimal should be like this: 
https://github.com/wangyum/spark/blob/refactor-decimal-pushdown/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L118-L146


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198907669
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -359,6 +369,70 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
 }
   }
 
+  test("filter pushdown - decimal") {
+Seq(true, false).foreach { legacyFormat =>
+  withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> 
legacyFormat.toString) {
+Seq(s"_1 decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 
32BitDecimalType
+  s"_1 decimal(${Decimal.MAX_LONG_DIGITS}, 2)",  // 
64BitDecimalType
+  "_1 decimal(38, 18)"   // 
ByteArrayDecimalType
+).foreach { schemaDDL =>
+  val schema = StructType.fromDDL(schemaDDL)
+  val rdd =
+spark.sparkContext.parallelize((1 to 4).map(i => Row(new 
java.math.BigDecimal(i
+  val dataFrame = spark.createDataFrame(rdd, schema)
+  testDecimalPushDown(dataFrame) { implicit df =>
+assert(df.schema === schema)
+checkFilterPredicate('_1.isNull, classOf[Eq[_]], 
Seq.empty[Row])
+checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+
+checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
+checkFilterPredicate('_1 =!= 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
+checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+checkFilterPredicate('_1 < 2 || '_1 > 3, 
classOf[Operators.Or], Seq(Row(1), Row(4)))
+  }
+}
+  }
+}
+  }
+
+  test("incompatible parquet file format will throw exeception") {
--- End diff --

If we can detect the case where the data is written with the legacy format, 
then why do we need a property to read with the legacy format? Why not do the 
right thing without a property?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198906232
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -359,6 +369,70 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
 }
   }
 
+  test("filter pushdown - decimal") {
+Seq(true, false).foreach { legacyFormat =>
+  withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> 
legacyFormat.toString) {
+Seq(s"_1 decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 
32BitDecimalType
--- End diff --

Since this is providing a column name, it would be better to use something 
more readable than _1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198904779
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -62,6 +98,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean) {
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+case decimal: DecimalType
+  if pushDownDecimal && (DecimalType.is32BitDecimalType(decimal) && 
!readLegacyFormat) =>
+  (n: String, v: Any) => FilterApi.eq(
+intColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
--- End diff --

Does this need to validate the scale of the decimal, or is scale adjusted 
in the analyzer?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198904504
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -62,6 +98,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean) {
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+case decimal: DecimalType
+  if pushDownDecimal && (DecimalType.is32BitDecimalType(decimal) && 
!readLegacyFormat) =>
+  (n: String, v: Any) => FilterApi.eq(
+intColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
+  .asInstanceOf[Integer]).orNull)
+case decimal: DecimalType
+  if pushDownDecimal && (DecimalType.is64BitDecimalType(decimal) && 
!readLegacyFormat) =>
+  (n: String, v: Any) => FilterApi.eq(
+longColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
+.asInstanceOf[java.lang.Long]).orNull)
+case decimal: DecimalType
+  if pushDownDecimal && ((DecimalType.is32BitDecimalType(decimal) && 
readLegacyFormat)
--- End diff --

Please add comments here to explain what differs when `readLegacyFormat` is 
true.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198904089
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -378,6 +378,22 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED =
+buildConf("spark.sql.parquet.filterPushdown.decimal")
+  .doc(s"If true, enables Parquet filter push-down optimization for 
Decimal. " +
+"The default value is false to compatible with legacy parquet 
format. " +
+s"This configuration only has an effect when 
'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
+"enabled and Decimal statistics are generated(Since Spark 2.4).")
+  .internal()
+  .booleanConf
+  .createWithDefault(true)
+
+  val PARQUET_READ_LEGACY_FORMAT = 
buildConf("spark.sql.parquet.readLegacyFormat")
--- End diff --

This property doesn't mention pushdown, but the description says it is only 
valid for push-down. Can you make the property name more clear?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org