[ 
https://issues.apache.org/jira/browse/SPARK-15632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Lian resolved SPARK-15632.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 2.0.0

Issue resolved by pull request 13529
[https://github.com/apache/spark/pull/13529]

> Dataset typed filter operation changes query plan schema
> --------------------------------------------------------
>
>                 Key: SPARK-15632
>                 URL: https://issues.apache.org/jira/browse/SPARK-15632
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Cheng Lian
>            Assignee: Xiang Zhong
>             Fix For: 2.0.0
>
>
> h1. Overview
> Filter operations should never change query plan schema. However, Dataset 
> typed filter operation does introduce schema change in some cases. 
> Furthermore, all the following aspects of the schema may be changed:
> # field order,
> # field number,
> # field data type,
> # field name, and
> # field nullability
> This is mostly because we wrap the actual {{Filter}} operator with a 
> {{SerializeFromObject}}/{{DeserializeToObject}} pair (query plan fragment 
> illustrated as following), which performs a bunch of magic tricks.
> {noformat}
> SerializeFromObject
>  Filter
>   DeserializeToObject
>    <child-plan>
> {noformat}
> h1. Reproduction
> h2. Field order, field number, and field data type change
> {code}
> case class A(b: Double, a: String)
> val data = Seq(
>   "{ 'a': 'foo', 'b': 1, 'c': 'extra' }",
>   "{ 'a': 'bar', 'b': 2, 'c': 'extra' }",
>   "{ 'a': 'bar', 'c': 'extra' }"
> )
> val df1 = spark.read.json(sc.parallelize(data))
> df1.printSchema()
> // root
> //  |-- a: string (nullable = true)
> //  |-- b: long (nullable = true)
> //  |-- c: string (nullable = true)
> val ds1 = df1.as[A]
> ds1.printSchema()
> // root
> //  |-- a: string (nullable = true)
> //  |-- b: long (nullable = true)
> //  |-- c: string (nullable = true)
> val ds2 = ds1.filter(_.b > 1)    // <- Here comes the trouble maker
> ds2.printSchema()
> // root                             <- 1. Reordered `a` and `b`, and
> //  |-- b: double (nullable = true)    2. dropped `c`, and
> //  |-- a: string (nullable = true)    3. up-casted `b` from long to double
> val df2 = ds2.toDF()
> df2.printSchema()
> // root                             <- (Same as above)
> //  |-- b: double (nullable = true)
> //  |-- a: string (nullable = true)
> {code}
> h3. Field order change
> {{DeserializeToObject}} resolves the encoder deserializer expression by 
> *name*. Thus field order in input query plan doesn't matter.
> h3. Field number change
> Same as above, fields not referred by the encoder are silently dropped while 
> resolving deserializer expressions by name.
> h3. Field data type change
> When generating deserializer expressions, we allows "sane" implicit coercions 
> (e.g. integer to long, and long to double) by inserting {{UpCast}} operators. 
> Thus actual field data types in input query plan don't matter either as long 
> as there are valid implicit coercions.
> h2. Field name and nullability change
> {code}
> val ds3 = spark.range(10)
> ds3.printSchema()
> // root
> //  |-- id: long (nullable = false)
> val ds4 = ds3.filter(_ > 3)
> ds4.printSchema()
> // root
> //  |-- value: long (nullable = true)  4. Name changed from `id` to `value`, 
> and
> //                                     5. nullability changed from false to 
> true
> {code}
> h3. Field name change
> Primitive encoders like {{Encoder\[Long\]}} doesn't have a named field, thus 
> they always has only a single field with hard-coded name "value". On the 
> other hand, when serializing domain objects back to rows, schema of 
> {{SerializeFromObject}} is solely determined by the encoder. Thus the 
> original name "id" becomes "value".
> h3. Nullability change
> [PR #11880|https://github.com/apache/spark/pull/11880] updated return type of 
> {{SparkSession.range}} from {{Dataset\[Long\]}} to 
> {{Dataset\[java.lang.Long\]}} due to 
> [SI-4388|https://issues.scala-lang.org/browse/SI-4388]. As a consequence, 
> although the underlying {{Range}} operator produces non-nullable output, the 
> result encoder is nullable since {{java.lang.Long}} is nullable. Thus, we 
> observe nullability change after typed filtering because serializer 
> expression is derived from encoder rather than the query plan.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to