Fwd: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Dávid Szakállas
+dev for more visibility. Is this a known issue? Is there a plan for a fix?

Thanks,
David

> Begin forwarded message:
> 
> From: Dávid Szakállas 
> Subject: Dataset schema incompatibility bug when reading column partitioned 
> data
> Date: 2019. March 29. 14:15:27 CET
> To: u...@spark.apache.org
> 
> We observed the following bug on Spark 2.4.0:
> 
> scala> 
> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
> 
> scala> val schema = StructType(Seq(StructField("_1", 
> IntegerType),StructField("_2", IntegerType)))
> 
> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
> +---+---+
> | _2| _1|
> +---+---+
> |  2|  1|
> +---+- --+
> 
> That is, when reading column partitioned Parquet files the explicitly 
> specified schema is not adhered to, instead the partitioning columns are 
> appended the end of the column list. This is a quite severe issue as some 
> operations, such as union, fails if columns are in a different order in two 
> datasets. Thus we have to work around the issue with a select:
> 
> val columnNames = schema.fields.map(_.name)
> ds.select(columnNames.head, columnNames.tail: _*)
> 
> 
> Thanks, 
> David Szakallas
> Data Engineer | Whitepages, Inc.



Re: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Bruce Robbins
I see a Jira:

https://issues.apache.org/jira/browse/SPARK-21021

On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas 
wrote:

> +dev for more visibility. Is this a known issue? Is there a plan for a fix?
>
> Thanks,
> David
>
> Begin forwarded message:
>
> *From: *Dávid Szakállas 
> *Subject: **Dataset schema incompatibility bug when reading column
> partitioned data*
> *Date: *2019. March 29. 14:15:27 CET
> *To: *u...@spark.apache.org
>
> We observed the following bug on Spark 2.4.0:
>
> scala> 
> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
>
> scala> val schema = StructType(Seq(StructField("_1", 
> IntegerType),StructField("_2", IntegerType)))
>
> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
> +---+---+
> | _2| _1|
> +---+---+
> |  2|  1|
> +---+- --+
>
>
> That is, when reading column partitioned Parquet files the explicitly
> specified schema is not adhered to, instead the partitioning columns are
> appended the end of the column list. This is a quite severe issue as some
> operations, such as union, fails if columns are in a different order in two
> datasets. Thus we have to work around the issue with a select:
>
> val columnNames = schema.fields.map(_.name)
> ds.select(columnNames.head, columnNames.tail: _*)
>
>
> Thanks,
> David Szakallas
> Data Engineer | Whitepages, Inc.
>
>
>


Re: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Ryan Blue
I think the confusion is that the schema passed to spark.read is not a
projection schema. I don’t think it is even used in this case because the
Parquet dataset has its own schema. You’re getting the schema of the table.
I think the correct behavior is to reject a user-specified schema in this
case.

On Thu, Apr 11, 2019 at 11:04 AM Bruce Robbins 
wrote:

> I see a Jira:
>
> https://issues.apache.org/jira/browse/SPARK-21021
>
> On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas 
> wrote:
>
>> +dev for more visibility. Is this a known issue? Is there a plan for a
>> fix?
>>
>> Thanks,
>> David
>>
>> Begin forwarded message:
>>
>> *From: *Dávid Szakállas 
>> *Subject: **Dataset schema incompatibility bug when reading column
>> partitioned data*
>> *Date: *2019. March 29. 14:15:27 CET
>> *To: *u...@spark.apache.org
>>
>> We observed the following bug on Spark 2.4.0:
>>
>> scala> 
>> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
>>
>> scala> val schema = StructType(Seq(StructField("_1", 
>> IntegerType),StructField("_2", IntegerType)))
>>
>> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
>> +---+---+
>> | _2| _1|
>> +---+---+
>> |  2|  1|
>> +---+- --+
>>
>>
>> That is, when reading column partitioned Parquet files the explicitly
>> specified schema is not adhered to, instead the partitioning columns are
>> appended the end of the column list. This is a quite severe issue as some
>> operations, such as union, fails if columns are in a different order in two
>> datasets. Thus we have to work around the issue with a select:
>>
>> val columnNames = schema.fields.map(_.name)
>> ds.select(columnNames.head, columnNames.tail: _*)
>>
>>
>> Thanks,
>> David Szakallas
>> Data Engineer | Whitepages, Inc.
>>
>>
>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Dataset schema incompatibility bug when reading column partitioned data

2019-04-13 Thread Felix Cheung
I kinda agree it is confusing when a parameter is not used...


From: Ryan Blue 
Sent: Thursday, April 11, 2019 11:07:25 AM
To: Bruce Robbins
Cc: Dávid Szakállas; Spark Dev List
Subject: Re: Dataset schema incompatibility bug when reading column partitioned 
data


I think the confusion is that the schema passed to spark.read is not a 
projection schema. I don’t think it is even used in this case because the 
Parquet dataset has its own schema. You’re getting the schema of the table. I 
think the correct behavior is to reject a user-specified schema in this case.

On Thu, Apr 11, 2019 at 11:04 AM Bruce Robbins 
mailto:bersprock...@gmail.com>> wrote:
I see a Jira:

https://issues.apache.org/jira/browse/SPARK-21021

On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas 
mailto:david.szakal...@gmail.com>> wrote:
+dev for more visibility. Is this a known issue? Is there a plan for a fix?

Thanks,
David

Begin forwarded message:

From: Dávid Szakállas 
mailto:david.szakal...@gmail.com>>
Subject: Dataset schema incompatibility bug when reading column partitioned data
Date: 2019. March 29. 14:15:27 CET
To: u...@spark.apache.org<mailto:u...@spark.apache.org>

We observed the following bug on Spark 2.4.0:


scala> 
spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")

scala> val schema = StructType(Seq(StructField("_1", 
IntegerType),StructField("_2", IntegerType)))

scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
+---+---+
| _2| _1|
+---+---+
|  2|  1|
+---+- --+

That is, when reading column partitioned Parquet files the explicitly specified 
schema is not adhered to, instead the partitioning columns are appended the end 
of the column list. This is a quite severe issue as some operations, such as 
union, fails if columns are in a different order in two datasets. Thus we have 
to work around the issue with a select:

val columnNames = schema.fields.map(_.name)
ds.select(columnNames.head, columnNames.tail: _*)


Thanks,
David Szakallas
Data Engineer | Whitepages, Inc.



--
Ryan Blue
Software Engineer
Netflix