[ https://issues.apache.org/jira/browse/SPARK-15632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316998#comment-15316998 ]
Sean Zhong commented on SPARK-15632: ------------------------------------ *Root cause analysis:* *The root cause is that the implementation of {{dataset.as\[B\]}} is wrong.* {code} scala> val df = Seq((1,2,3)).toDF("a", "b", "c") scala> case class B(b: Int) scala> val b = df.as[B] {code} We expect b to be a "view" of underlying df, which *SHOULD* only returns *ONE* column, but now b contains all the data. *What we expect:* {code} scala> b.show() +---+ | b| +---+ | 2| +---+ {code} *What it behaves now:* {code} scala> b.show() +---+---+---+ | a| b| c| +---+---+---+ | 1| 2| 3| +---+---+---+ {code} > Dataset typed filter operation changes query plan schema > -------------------------------------------------------- > > Key: SPARK-15632 > URL: https://issues.apache.org/jira/browse/SPARK-15632 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 2.0.0 > Reporter: Cheng Lian > Assignee: Xiang Zhong > > h1. Overview > Filter operations should never change query plan schema. However, Dataset > typed filter operation does introduce schema change in some cases. > Furthermore, all the following aspects of the schema may be changed: > # field order, > # field number, > # field data type, > # field name, and > # field nullability > This is mostly because we wrap the actual {{Filter}} operator with a > {{SerializeFromObject}}/{{DeserializeToObject}} pair (query plan fragment > illustrated as following), which performs a bunch of magic tricks. > {noformat} > SerializeFromObject > Filter > DeserializeToObject > <child-plan> > {noformat} > h1. Reproduction > h2. Field order, field number, and field data type change > {code} > case class A(b: Double, a: String) > val data = Seq( > "{ 'a': 'foo', 'b': 1, 'c': 'extra' }", > "{ 'a': 'bar', 'b': 2, 'c': 'extra' }", > "{ 'a': 'bar', 'c': 'extra' }" > ) > val df1 = spark.read.json(sc.parallelize(data)) > df1.printSchema() > // root > // |-- a: string (nullable = true) > // |-- b: long (nullable = true) > // |-- c: string (nullable = true) > val ds1 = df1.as[A] > ds1.printSchema() > // root > // |-- a: string (nullable = true) > // |-- b: long (nullable = true) > // |-- c: string (nullable = true) > val ds2 = ds1.filter(_.b > 1) // <- Here comes the trouble maker > ds2.printSchema() > // root <- 1. Reordered `a` and `b`, and > // |-- b: double (nullable = true) 2. dropped `c`, and > // |-- a: string (nullable = true) 3. up-casted `b` from long to double > val df2 = ds2.toDF() > df2.printSchema() > // root <- (Same as above) > // |-- b: double (nullable = true) > // |-- a: string (nullable = true) > {code} > h3. Field order change > {{DeserializeToObject}} resolves the encoder deserializer expression by > *name*. Thus field order in input query plan doesn't matter. > h3. Field number change > Same as above, fields not referred by the encoder are silently dropped while > resolving deserializer expressions by name. > h3. Field data type change > When generating deserializer expressions, we allows "sane" implicit coercions > (e.g. integer to long, and long to double) by inserting {{UpCast}} operators. > Thus actual field data types in input query plan don't matter either as long > as there are valid implicit coercions. > h2. Field name and nullability change > {code} > val ds3 = spark.range(10) > ds3.printSchema() > // root > // |-- id: long (nullable = false) > val ds4 = ds3.filter(_ > 3) > ds4.printSchema() > // root > // |-- value: long (nullable = true) 4. Name changed from `id` to `value`, > and > // 5. nullability changed from false to > true > {code} > h3. Field name change > Primitive encoders like {{Encoder\[Long\]}} doesn't have a named field, thus > they always has only a single field with hard-coded name "value". On the > other hand, when serializing domain objects back to rows, schema of > {{SerializeFromObject}} is solely determined by the encoder. Thus the > original name "id" becomes "value". > h3. Nullability change > [PR #11880|https://github.com/apache/spark/pull/11880] updated return type of > {{SparkSession.range}} from {{Dataset\[Long\]}} to > {{Dataset\[java.lang.Long\]}} due to > [SI-4388|https://issues.scala-lang.org/browse/SI-4388]. As a consequence, > although the underlying {{Range}} operator produces non-nullable output, the > result encoder is nullable since {{java.lang.Long}} is nullable. Thus, we > observe nullability change after typed filtering because serializer > expression is derived from encoder rather than the query plan. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org