This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 45ba9224602e [SPARK-47385] Fix tuple encoders with Option inputs 45ba9224602e is described below commit 45ba9224602eb18fe45e339cbb8cf2e8a4924f0b Author: Chenhao Li <chenhao...@databricks.com> AuthorDate: Thu Mar 14 14:27:36 2024 +0800 [SPARK-47385] Fix tuple encoders with Option inputs https://github.com/apache/spark/pull/40755 adds a null check on the input of the child deserializer in the tuple encoder. It breaks the deserializer for the `Option` type, because null should be deserialized into `None` rather than null. This PR adds a boolean parameter to `ExpressionEncoder.tuple` so that only the user that https://github.com/apache/spark/pull/40755 intended to fix has this null check. Unit test. Closes #45508 from chenhao-db/SPARK-47385. Authored-by: Chenhao Li <chenhao...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 9986462811f160eacd766da8a4e14a9cbb4b8710) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 10 ++++++++-- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 12 ++++++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index e6477e48fe96..2e60323bdd0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -79,8 +79,14 @@ object ExpressionEncoder { * Given a set of N encoders, constructs a new encoder that produce objects as items in an * N-tuple. Note that these encoders should be unresolved so that information about * name/positional binding is preserved. + * When `useNullSafeDeserializer` is true, the deserialization result for a child will be null if + * the input is null. It is false by default as most deserializers handle null input properly and + * don't require an extra null check. Some of them are null-tolerant, such as the deserializer for + * `Option[T]`, and we must not set it to true in this case. */ - def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { + def tuple( + encoders: Seq[ExpressionEncoder[_]], + useNullSafeDeserializer: Boolean = false): ExpressionEncoder[_] = { if (encoders.length > 22) { throw QueryExecutionErrors.elementsOfTupleExceedLimitError() } @@ -125,7 +131,7 @@ object ExpressionEncoder { case GetColumnByOrdinal(0, _) => input } - if (enc.objSerializer.nullable) { + if (useNullSafeDeserializer && enc.objSerializer.nullable) { nullSafe(input, childDeserializer) } else { childDeserializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 233b61926c4f..8b14ed651362 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1173,7 +1173,9 @@ class Dataset[T] private[sql]( } implicit val tuple2Encoder: Encoder[(T, U)] = - ExpressionEncoder.tuple(this.exprEnc, other.exprEnc) + ExpressionEncoder + .tuple(Seq(this.exprEnc, other.exprEnc), useNullSafeDeserializer = true) + .asInstanceOf[Encoder[(T, U)]] val leftResultExpr = { if (!this.exprEnc.isSerializedAsStructForTopLevel) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 43322b6dc972..fdbb463c0553 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2197,6 +2197,18 @@ class DatasetSuite extends QueryTest ) assert(result == expected) } + + test("SPARK-47385: Tuple encoder with Option inputs") { + implicit val enc: Encoder[(SingleData, Option[SingleData])] = + Encoders.tuple(Encoders.product[SingleData], Encoders.product[Option[SingleData]]) + + val input = Seq( + (SingleData(1), Some(SingleData(1))), + (SingleData(2), None) + ) + val ds = spark.createDataFrame(input).as[(SingleData, Option[SingleData])] + checkDataset(ds, input: _*) + } } case class Bar(a: Int) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org