[jira] [Commented] (SPARK-37191) Allow merging DecimalTypes with different precision values

2021-11-01 Thread Ivan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437061#comment-17437061
 ] 

Ivan commented on SPARK-37191:
--

This is somewhat related to https://issues.apache.org/jira/browse/SPARK-32317 
although the issue is a bit different - they are trying to merge decimals of 
different scales.

> Allow merging DecimalTypes with different precision values 
> ---
>
> Key: SPARK-37191
> URL: https://issues.apache.org/jira/browse/SPARK-37191
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.3, 3.1.0, 3.1.1, 3.2.0
>Reporter: Ivan
>Priority: Major
> Fix For: 3.3.0
>
>
> When merging DecimalTypes with different precision but the same scale, one 
> would get the following error:
> {code:java}
> Failed to merge fields 'col' and 'col'. Failed to merge decimal types with 
> incompatible precision 17 and 12   at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:652)
>   at scala.Option.map(Option.scala:230)
>   at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:644)
>   at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:641)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>   at org.apache.spark.sql.types.StructType$.merge(StructType.scala:641)
>   at org.apache.spark.sql.types.StructType.merge(StructType.scala:550) 
> {code}
>  
> We could allow merging DecimalType values with different precision if the 
> scale is the same for both types since there should not be any data 
> correctness issues as one of the types will be extended, for example, 
> DECIMAL(12, 2) -> DECIMAL(17, 2); however, this is not the case for upcasting 
> when the scale is different - this would depend on the actual values.
>  
> Repro code:
> {code:java}
> import org.apache.spark.sql.types._
> val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)
> val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)
> schema1.merge(schema2) {code}
>  
> This also affects Parquet schema merge which is where this issue was 
> discovered originally:
> {code:java}
> import java.math.BigDecimal
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val data1 = sc.parallelize(Row(new BigDecimal("123456789.11")) :: Nil, 1)
> val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)
> val data2 = sc.parallelize(Row(new BigDecimal("123456789.11")) :: Nil, 1)
> val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)
> spark.createDataFrame(data2, 
> schema2).write.parquet("/tmp/decimal-test.parquet")
> spark.createDataFrame(data1, 
> schema1).write.mode("append").parquet("/tmp/decimal-test.parquet")
> // Reading the DataFrame fails
> spark.read.option("mergeSchema", 
> "true").parquet("/tmp/decimal-test.parquet").show()
> >>>
> Failed merging schema:
> root
>  |-- col: decimal(17,2) (nullable = true)
> Caused by: Failed to merge fields 'col' and 'col'. Failed to merge decimal 
> types with incompatible precision 12 and 17
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37191) Allow merging DecimalTypes with different precision values

2021-11-01 Thread Ivan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan updated SPARK-37191:
-
Description: 
When merging DecimalTypes with different precision but the same scale, one 
would get the following error:
{code:java}
Failed to merge fields 'col' and 'col'. Failed to merge decimal types with 
incompatible precision 17 and 12 at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:652)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:644)
at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:641)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.sql.types.StructType$.merge(StructType.scala:641)
at org.apache.spark.sql.types.StructType.merge(StructType.scala:550) 
{code}
 

We could allow merging DecimalType values with different precision if the scale 
is the same for both types since there should not be any data correctness 
issues as one of the types will be extended, for example, DECIMAL(12, 2) -> 
DECIMAL(17, 2); however, this is not the case for upcasting when the scale is 
different - this would depend on the actual values.

 

Repro code:
{code:java}
import org.apache.spark.sql.types._

val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)
val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)
schema1.merge(schema2) {code}
 

This also affects Parquet schema merge which is where this issue was discovered 
originally:
{code:java}
import java.math.BigDecimal
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val data1 = sc.parallelize(Row(new BigDecimal("123456789.11")) :: Nil, 1)
val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)

val data2 = sc.parallelize(Row(new BigDecimal("123456789.11")) :: Nil, 1)
val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)

spark.createDataFrame(data2, schema2).write.parquet("/tmp/decimal-test.parquet")
spark.createDataFrame(data1, 
schema1).write.mode("append").parquet("/tmp/decimal-test.parquet")

// Reading the DataFrame fails
spark.read.option("mergeSchema", 
"true").parquet("/tmp/decimal-test.parquet").show()

>>>
Failed merging schema:
root
 |-- col: decimal(17,2) (nullable = true)

Caused by: Failed to merge fields 'col' and 'col'. Failed to merge decimal 
types with incompatible precision 12 and 17



{code}
 

  was:
When merging DecimalTypes with different precision but the same scale, one 
would get the following error:
{code:java}
Failed to merge fields 'col' and 'col'. Failed to merge decimal types with 
incompatible precision 17 and 12 at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:652)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:644)
at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:641)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.sql.types.StructType$.merge(StructType.scala:641)
at org.apache.spark.sql.types.StructType.merge(StructType.scala:550) 
{code}
 

We could allow merging DecimalType values with different precision if the scale 
is the same for both types since there should not be any data correctness 
issues as one of the types will be extended, for example, DECIMAL(12, 2) -> 
DECIMAL(17, 2); however, this is not the case for upcasting when the scale is 
different - this would depend on the actual values.

 

Repro code:
{code:java}
import org.apache.spark.sql.types._

val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)
val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)
schema1.merge(schema2) {code}
 

This also affects Parquet schema merge which is where this issue was discovered 
originally:
{code:java}
import java.math.BigDecimal
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val data1 = sc.parallelize(Row(new BigDecimal("123456789.11")) :: Nil, 1)
val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)

val data2 = sc.parallelize(Row(new BigDecimal("123456789.11")) :: Nil, 1)
val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)

spark.createDataFrame(data2, schema2).write.parquet("/tmp/decimal-test.parquet")
spark.createDataFrame(data1, 

[jira] [Updated] (SPARK-37191) Allow merging DecimalTypes with different precision values

2021-11-01 Thread Ivan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan updated SPARK-37191:
-
Description: 
When merging DecimalTypes with different precision but the same scale, one 
would get the following error:
{code:java}
Failed to merge fields 'col' and 'col'. Failed to merge decimal types with 
incompatible precision 17 and 12 at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:652)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:644)
at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:641)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.sql.types.StructType$.merge(StructType.scala:641)
at org.apache.spark.sql.types.StructType.merge(StructType.scala:550) 
{code}
 

We could allow merging DecimalType values with different precision if the scale 
is the same for both types since there should not be any data correctness 
issues as one of the types will be extended, for example, DECIMAL(12, 2) -> 
DECIMAL(17, 2); however, this is not the case for upcasting when the scale is 
different - this would depend on the actual values.

 

Repro code:
{code:java}
import org.apache.spark.sql.types._

val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)
val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)
schema1.merge(schema2) {code}
 

This also affects Parquet schema merge which is where this issue was discovered 
originally:
{code:java}
import java.math.BigDecimal
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val data1 = sc.parallelize(Row(new BigDecimal("123456789.11")) :: Nil, 1)
val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)

val data2 = sc.parallelize(Row(new BigDecimal("123456789.11")) :: Nil, 1)
val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)

spark.createDataFrame(data2, schema2).write.parquet("/tmp/decimal-test.parquet")
spark.createDataFrame(data1, 
schema1).write.mode("append").parquet("/tmp/decimal-test.parquet")

// Reading the DataFrame fails
spark.read.option("mergeSchema", 
"true").parquet("/mnt/ivan/decimal-test.parquet").show()

>>>
Failed merging schema:
root
 |-- col: decimal(17,2) (nullable = true)

Caused by: Failed to merge fields 'col' and 'col'. Failed to merge decimal 
types with incompatible precision 12 and 17



{code}
 

> Allow merging DecimalTypes with different precision values 
> ---
>
> Key: SPARK-37191
> URL: https://issues.apache.org/jira/browse/SPARK-37191
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.3, 3.1.0, 3.1.1, 3.2.0
>Reporter: Ivan
>Priority: Major
> Fix For: 3.3.0
>
>
> When merging DecimalTypes with different precision but the same scale, one 
> would get the following error:
> {code:java}
> Failed to merge fields 'col' and 'col'. Failed to merge decimal types with 
> incompatible precision 17 and 12   at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:652)
>   at scala.Option.map(Option.scala:230)
>   at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:644)
>   at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:641)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>   at org.apache.spark.sql.types.StructType$.merge(StructType.scala:641)
>   at org.apache.spark.sql.types.StructType.merge(StructType.scala:550) 
> {code}
>  
> We could allow merging DecimalType values with different precision if the 
> scale is the same for both types since there should not be any data 
> correctness issues as one of the types will be extended, for example, 
> DECIMAL(12, 2) -> DECIMAL(17, 2); however, this is not the case for upcasting 
> when the scale is different - this would depend on the actual values.
>  
> Repro code:
> {code:java}
> import org.apache.spark.sql.types._
> val schema1 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)
> val schema2 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)
> schema1.merge(schema2) {code}
>  
> This also affects Parquet schema merge which is where this issue was 
> discovered originally:
> {code:java}
> import java.math.BigDecimal

[jira] [Created] (SPARK-37191) Allow merging DecimalTypes with different precision values

2021-11-01 Thread Ivan (Jira)
Ivan created SPARK-37191:


 Summary: Allow merging DecimalTypes with different precision 
values 
 Key: SPARK-37191
 URL: https://issues.apache.org/jira/browse/SPARK-37191
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0, 3.1.1, 3.1.0, 3.0.3
Reporter: Ivan
 Fix For: 3.3.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36803) ClassCastException: optional int32 col-0 is not a group when reading legacy Parquet files

2021-09-19 Thread Ivan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan updated SPARK-36803:
-
Description: 
When reading Parquet files that have been written in legacy mode and schema 
evolution, we observed that 2-level LIST annotated types are traversed 
incorrectly. 

The root cause is the imprecise check on the underlying element type for Array 
types (and potentially Map types but I have not checked those yet) that happens 
here: 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]

The issue is only reproducible with schema evolution with parquet-mr reader and 
when there are two schemas like this:

File 1:
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
{code}
File 2:
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
 |||-- col-1: integer (nullable = true){code}
 

When ParquetRowConverter tries to unwrap ArrayType, it checks if the underlying 
types between Parquet and Spark match. However, in the case above since the 
actual schema would include both fields, resulting in mismatch and failure to 
read File 1:
{noformat}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 11.0 failed 1 times, most recent failure: Lost task 1.0 in stage 11.0 
(TID 18) (ip-1-2-3-4.us-west-2.compute.internal executor driver): 
java.lang.ClassCastException: optional int32 col-0 is not a group
at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:424)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter$ElementConverter.(ParquetRowConverter.scala:633)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:616)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:390)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:214)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:210){noformat}
This happens due to L606 in ParquetRowConverter: 
{code:java}
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)
{code}
The code assumes that we are working with 3 level lists and would incorrectly 
remove the “dummy” level from the Parquet schema.

The actual error varies depending on column names - in this case struct type 
name matches primitive type name so we end up with "optional int32 col-0 is not 
a group". In other case, it could fail with IndexOutOfBoundException or 
NoSuchElementException when the column name is not found in the struct.

The reason it works with 3-level list, that 
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) 
always evaluates to false, we remove the “dummy” level and perform struct match 
which takes into account schema evolution.  

 

Repro:
{code:java}
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val schema1 = StructType(
  StructField("col-0", ArrayType(
StructType(
  StructField("col-0", IntegerType, true) :: Nil
), 
containsNull = false
  )) :: Nil
)
val rdd1 = sc.parallelize(Row(Array(Row(1))) :: Nil, 1)
val df1 = spark.createDataFrame(rdd1, schema1)

df1.write.parquet("/tmp/legacy-parquet")

val schema2 = StructType(
  StructField("col-0", ArrayType(
StructType(
  StructField("col-0", IntegerType, true) :: StructField("col-1", 
IntegerType, true) :: Nil
), 
containsNull = false
  )) :: Nil
)
val rdd2 = sc.parallelize(Row(Array(Row(1, 2))) :: Nil, 1)
val df2 = spark.createDataFrame(rdd2, schema2)

df2.write.mode("append").parquet("/tmp/legacy-parquet")

// Fails with: Caused by: ClassCastException: optional int32 col-0 is not a 
group
display(spark.read.schema(schema2).parquet("/tmp/legacy-parquet"))
{code}
 

  was:

[jira] [Updated] (SPARK-36803) ClassCastException: optional int32 col-0 is not a group when reading legacy Parquet files

2021-09-19 Thread Ivan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan updated SPARK-36803:
-
Description: 
When reading Parquet files that have been written in legacy mode and schema 
evolution, we observed that 2-level LIST annotated types are traversed 
incorrectly. 

The root cause is the imprecise check on the underlying element type for Array 
types (and potentially Map types but I have not checked those yet) that happens 
here: 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]

The issue is only reproducible with schema evolution when there are two schemas 
like this:

File 1:
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
{code}
File 2:
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
 |||-- col-1: integer (nullable = true){code}
 

When ParquetRowConverter tries to unwrap ArrayType, it checks if the underlying 
types between Parquet and Spark match. However, in the case above since the 
actual schema would include both fields, resulting in mismatch and failure to 
read File 1:
{noformat}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 11.0 failed 1 times, most recent failure: Lost task 1.0 in stage 11.0 
(TID 18) (ip-1-2-3-4.us-west-2.compute.internal executor driver): 
java.lang.ClassCastException: optional int32 col-0 is not a group
at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:424)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter$ElementConverter.(ParquetRowConverter.scala:633)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:616)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:390)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:214)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:210){noformat}
This happens due to L606 in ParquetRowConverter: 
{code:java}
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)
{code}
The code assumes that we are working with 3 level lists and would incorrectly 
remove the “dummy” level from the Parquet schema.

The actual error varies depending on column names - in this case struct type 
name matches primitive type name so we end up with "optional int32 col-0 is not 
a group". In other case, it could fail with IndexOutOfBoundException or 
NoSuchElementException when the column name is not found in the struct.

The reason it works with 3-level list, that 
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) 
always evaluates to false, we remove the “dummy” level and perform struct match 
which takes into account schema evolution.  

 

Repro:
{code:java}

import org.apache.spark.sql._
import org.apache.spark.sql.types._

val schema1 = StructType(
  StructField("col-0", ArrayType(
StructType(
  StructField("col-0", IntegerType, true) :: Nil
), 
containsNull = false
  )) :: Nil
)
val rdd1 = sc.parallelize(Row(Array(Row(1))) :: Nil, 1)
val df1 = spark.createDataFrame(rdd1, schema1)

df1.write.parquet("/tmp/legacy-parquet")

val schema2 = StructType(
  StructField("col-0", ArrayType(
StructType(
  StructField("col-0", IntegerType, true) :: StructField("col-1", 
IntegerType, true) :: Nil
), 
containsNull = false
  )) :: Nil
)
val rdd2 = sc.parallelize(Row(Array(Row(1, 2))) :: Nil, 1)
val df2 = spark.createDataFrame(rdd2, schema2)

df2.write.mode("append").parquet("/tmp/legacy-parquet")

// Fails with: Caused by: ClassCastException: optional int32 col-0 is not a 
group
display(spark.read.schema(schema2).parquet("/tmp/legacy-parquet"))
{code}

  

  was:
When reading Parquet files 

[jira] [Updated] (SPARK-36803) ClassCastException: optional int32 col-0 is not a group when reading legacy Parquet files

2021-09-19 Thread Ivan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan updated SPARK-36803:
-
Description: 
When reading Parquet files that have been written in legacy mode and schema 
evolution, we observed that 2-level LIST annotated types are traversed 
incorrectly. 

The root cause is the imprecise check on the underlying element type for Array 
types (and potentially Map types but I have not checked those yet) that happens 
here: 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]

The issue is only reproducible with schema evolution when there are two schemas 
like this:

File 1:
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
{code}
File 2:
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
 |||-- col-1: integer (nullable = true){code}
 

When ParquetRowConverter tries to unwrap ArrayType, it checks if the underlying 
types between Parquet and Spark match. However, in the case above since the 
actual schema would include both fields, resulting in mismatch and failure to 
read File 1:
{noformat}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 11.0 failed 1 times, most recent failure: Lost task 1.0 in stage 11.0 
(TID 18) (ip-1-2-3-4.us-west-2.compute.internal executor driver): 
java.lang.ClassCastException: optional int32 col-0 is not a group
at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:424)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter$ElementConverter.(ParquetRowConverter.scala:633)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:616)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:390)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:214)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:210){noformat}
This happens due to L606 in ParquetRowConverter: 
{code:java}
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)
{code}
The code assumes that we are working with 3 level lists and would incorrectly 
remove the “dummy” level from the Parquet schema.

The actual error varies depending on column names - in this case struct type 
name matches primitive type name so we end up with "optional int32 col-0 is not 
a group". In other case, it could fail with IndexOutOfBoundException or 
NoSuchElementException when the column name is not found in the struct.

The reason it works with 3-level list, that 
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) 
always evaluates to false, we remove the “dummy” level and perform struct match 
which takes into account schema evolution.  
  

  was:
When reading Parquet files that have been written in legacy mode and schema 
evolution, we observed that 2-level LIST annotated types are traversed 
incorrectly. 

The root cause is the imprecise check on the underlying element type for Array 
types (and potentially Map types but I have not checked those yet) that happens 
here: 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]

The issue is only reproducible with schema evolution when there are two schemas 
like this:

File 1:

 

 
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
{code}
File 2:
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
 |||-- col-1: integer (nullable = true){code}
 

When 

[jira] [Updated] (SPARK-36803) ClassCastException: optional int32 col-0 is not a group when reading legacy Parquet files

2021-09-19 Thread Ivan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan updated SPARK-36803:
-
Description: 
When reading Parquet files that have been written in legacy mode and schema 
evolution, we observed that 2-level LIST annotated types are traversed 
incorrectly. 

The root cause is the imprecise check on the underlying element type for Array 
types (and potentially Map types but I have not checked those yet) that happens 
here: 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]

The issue is only reproducible with schema evolution when there are two schemas 
like this:

File 1:

 

 
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
{code}
File 2:
{code:java}
root
 |-- col-0: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- col-0: integer (nullable = true)
 |||-- col-1: integer (nullable = true){code}
 

When ParquetRowConverter tries to unwrap ArrayType, it checks if the underlying 
types between Parquet and Spark match. However, in the case above since the 
actual schema would include both fields, resulting in mismatch and failure to 
read File 1:
{noformat}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 11.0 failed 1 times, most recent failure: Lost task 1.0 in stage 11.0 
(TID 18) (ip-1-2-3-4.us-west-2.compute.internal executor driver): 
java.lang.ClassCastException: optional int32 col-0 is not a group
at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:424)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter$ElementConverter.(ParquetRowConverter.scala:633)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:616)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:390)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:214)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:210){noformat}
Now the L606 in ParquetRowConverter: 
{code:java}
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)
{code}
would return false because the check matches the types exactly and in this case 
they don’t match. The code assumes that we are working with 3 level lists and 
would incorrectly remove the “dummy” level from the Parquet schema.

The actual error varies depending on column names - in this case struct type 
name matches primitive type name so we end up with "optional int32 col-0 is not 
a group". In other case, it could fail with IndexOutOfBoundException or 
NoSuchElementException when the column name is not found in the struct.

The reason it works with 3-level list, that 
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) 
always evaluates to false, we remove the “dummy” level and perform struct match 
which takes into account schema evolution.  
  

  was:
When reading Parquet files that have been written in legacy mode and schema 
evolution, we observed that 2-level LIST annotated types are traversed 
incorrectly. 

The root cause is the imprecise check on the underlying element type for Array 
types (and potentially Map types but I have not checked those yet) that happens 
here: 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]

When we write arrays in legacy mode, we use 2-level wrapping like this:

 

{{optional group col-0 (LIST) \{
  repeated group array {
optional group col-0 {
  optional float col-0;
}
  }
}}}

It works just fine if the corresponding Spark 

[jira] [Updated] (SPARK-36803) ClassCastException: optional int32 col-0 is not a group when reading legacy Parquet files

2021-09-19 Thread Ivan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan updated SPARK-36803:
-
Description: 
When reading Parquet files that have been written in legacy mode and schema 
evolution, we observed that 2-level LIST annotated types are traversed 
incorrectly. 

The root cause is the imprecise check on the underlying element type for Array 
types (and potentially Map types but I have not checked those yet) that happens 
here: 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]

When we write arrays in legacy mode, we use 2-level wrapping like this:

 

{{optional group col-0 (LIST) \{
  repeated group array {
optional group col-0 {
  optional float col-0;
}
  }
}}}

It works just fine if the corresponding Spark schema for all of the Parquet 
files is like this:

 

{{ArrayType(StructType(
  StructField(col-0, StructType(
StructField(col-0, FloatType, true)
  ))
))}}

When ParquetRowConverter tries to unwrap ArrayType, it checks if the underlying 
types between Parquet and Spark match. In this case, they do, so it is all good.

The problem arises when (due to schema evolution), parquet schema does not 
match the Spark one, for example:

 

{{ArrayType(StructType(
  StructField(col-1, LongType, true), <-- added field
  StructField(col-0, StructType(
StructField(col-0, FloatType, true)
  ))
))}}

Now the L606 in ParquetRowConverter: 
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) 
would return false because the check matches the types exactly and in this case 
they don’t match. The code assumes that we are working with 3 level lists and 
would incorrectly remove the “dummy” level from the Parquet schema, leaving us 
with the following:

 

{{optional float col-0
 
StructType(StructField(col-0, FloatType, true))}}

The actual error varies depending on column names - in this case struct type 
name matches primitive type name so we end up with optional float col-0 is not 
a group. In other case, it could fail with IndexOutOfBoundException or 
NoSuchElementException when the column name is not found in the struct.

 

The reason it works with 3-level list, that 
DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) 
always evaluates to false, we remove the “dummy” level and perform struct match 
which takes into account schema evolution here: 
[https://github.com/databricks/runtime/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L210].

So the check should DataType.equalsIgnoreCompatibleNullability should probably 
be something like DataType.partiallyContainsSchema, where we can check that the 
guessedElementType is a subset of the elementType.

IMHO, this creates an impression that the code works rather incidentally for 
legacy mode due to the check.

 

Logs for converting the offending type for different cases:
 
 

> ClassCastException: optional int32 col-0 is not a group when reading legacy 
> Parquet files 
> --
>
> Key: SPARK-36803
> URL: https://issues.apache.org/jira/browse/SPARK-36803
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: Ivan
>Priority: Major
>
> When reading Parquet files that have been written in legacy mode and schema 
> evolution, we observed that 2-level LIST annotated types are traversed 
> incorrectly. 
> The root cause is the imprecise check on the underlying element type for 
> Array types (and potentially Map types but I have not checked those yet) that 
> happens here: 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]
> When we write arrays in legacy mode, we use 2-level wrapping like this:
>  
> {{optional group col-0 (LIST) \{
>   repeated group array {
> optional group col-0 {
>   optional float col-0;
> }
>   }
> }}}
> It works just fine if the corresponding Spark schema for all of the Parquet 
> files is like this:
>  
> {{ArrayType(StructType(
>   StructField(col-0, StructType(
> StructField(col-0, FloatType, true)
>   ))
> ))}}
> When ParquetRowConverter tries to unwrap ArrayType, it checks if the 
> underlying types between Parquet and Spark match. In this case, they do, so 
> it is all good.
> The problem 

[jira] [Created] (SPARK-36803) ClassCastException: optional int32 col-0 is not a group when reading legacy Parquet files

2021-09-19 Thread Ivan (Jira)
Ivan created SPARK-36803:


 Summary: ClassCastException: optional int32 col-0 is not a group 
when reading legacy Parquet files 
 Key: SPARK-36803
 URL: https://issues.apache.org/jira/browse/SPARK-36803
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.2
Reporter: Ivan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36163) Propagate correct JDBC properties in JDBC connector provider and add "connectionProvider" option

2021-07-15 Thread Ivan (Jira)
Ivan created SPARK-36163:


 Summary: Propagate correct JDBC properties in JDBC connector 
provider and add "connectionProvider" option
 Key: SPARK-36163
 URL: https://issues.apache.org/jira/browse/SPARK-36163
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.2, 3.1.1, 3.1.0
Reporter: Ivan


There are a couple of issues with JDBC connection providers. The first is a bug 
caused by 
[https://github.com/apache/spark/commit/c3ce9701b458511255072c72b9b245036fa98653]
 where we would pass all properties, including JDBC data source keys, to the 
JDBC driver which results in errors like {{java.sql.SQLException: Unrecognized 
connection property 'url'}}.

Connection properties are supposed to only include vendor properties, url 
config is a JDBC option and should be excluded.

The fix would be replacing {{jdbcOptions.asProperties.asScala.foreach}} with 
{{jdbcOptions.asConnectionProperties.asScala.foreach}} which is java.sql.Driver 
friendly.

 

I also investigated the problem with multiple providers and I think there are a 
couple of oversights in {{ConnectionProvider}} implementation. I think it is 
missing two things:
 * Any {{JdbcConnectionProvider}} should take precedence over 
{{BasicConnectionProvider}}. {{BasicConnectionProvider}} should only be 
selected if there was no match found when inferring providers that can handle 
JDBC url.

 * There is currently no way to select a specific provider that you want, 
similar to how you can select a JDBC driver. The use case is, for example, 
having connection providers for two databases that handle the same URL but have 
slightly different semantics and you want to select one in one case and the 
other one in others.

 ** I think the first point could be discarded when the second one is addressed.

You can technically use {{spark.sql.sources.disabledJdbcConnProviderList}} to 
exclude ones that don’t need to be included, but I am not quite sure why it was 
done that way - it is much simpler to allow users to enforce the provider they 
want.

This ticket fixes it by adding a {{connectionProvider}} option to the JDBC data 
source that allows users to select a particular provider when the ambiguity 
arises.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36163) Propagate correct JDBC properties in JDBC connector provider and add "connectionProvider" option

2021-07-15 Thread Ivan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381385#comment-17381385
 ] 

Ivan commented on SPARK-36163:
--

I will open a PR for this shortly.

> Propagate correct JDBC properties in JDBC connector provider and add 
> "connectionProvider" option
> 
>
> Key: SPARK-36163
> URL: https://issues.apache.org/jira/browse/SPARK-36163
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0, 3.1.1, 3.1.2
>Reporter: Ivan
>Priority: Major
>
> There are a couple of issues with JDBC connection providers. The first is a 
> bug caused by 
> [https://github.com/apache/spark/commit/c3ce9701b458511255072c72b9b245036fa98653]
>  where we would pass all properties, including JDBC data source keys, to the 
> JDBC driver which results in errors like {{java.sql.SQLException: 
> Unrecognized connection property 'url'}}.
> Connection properties are supposed to only include vendor properties, url 
> config is a JDBC option and should be excluded.
> The fix would be replacing {{jdbcOptions.asProperties.asScala.foreach}} with 
> {{jdbcOptions.asConnectionProperties.asScala.foreach}} which is 
> java.sql.Driver friendly.
>  
> I also investigated the problem with multiple providers and I think there are 
> a couple of oversights in {{ConnectionProvider}} implementation. I think it 
> is missing two things:
>  * Any {{JdbcConnectionProvider}} should take precedence over 
> {{BasicConnectionProvider}}. {{BasicConnectionProvider}} should only be 
> selected if there was no match found when inferring providers that can handle 
> JDBC url.
>  * There is currently no way to select a specific provider that you want, 
> similar to how you can select a JDBC driver. The use case is, for example, 
> having connection providers for two databases that handle the same URL but 
> have slightly different semantics and you want to select one in one case and 
> the other one in others.
>  ** I think the first point could be discarded when the second one is 
> addressed.
> You can technically use {{spark.sql.sources.disabledJdbcConnProviderList}} to 
> exclude ones that don’t need to be included, but I am not quite sure why it 
> was done that way - it is much simpler to allow users to enforce the provider 
> they want.
> This ticket fixes it by adding a {{connectionProvider}} option to the JDBC 
> data source that allows users to select a particular provider when the 
> ambiguity arises.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org