This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 02a5e56 [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns 02a5e56 is described below commit 02a5e56ba75cf6a5ddac82a5f4283be0402a5388 Author: Koert Kuipers <ko...@tresata.com> AuthorDate: Tue Jul 20 09:09:22 2021 -0700 [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns ### What changes were proposed in this pull request? Preserve the insertion order of columns in Dataset.withColumns ### Why are the changes needed? It is the expected behavior. We preserve insertion order in all other places. ### Does this PR introduce _any_ user-facing change? No. Currently Dataset.withColumns is not actually used anywhere to insert more than one column. This change is to make sure it behaves as expected when it is used for that purpose in future. ### How was this patch tested? Added test in DatasetSuite Closes #33423 from koertkuipers/feat-withcolumns-preserve-order. Authored-by: Koert Kuipers <ko...@tresata.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> (cherry picked from commit bf680bf25aae9619d462caee05c41cc33909338a) Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5d83d1e..0d6fde3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2422,10 +2422,10 @@ class Dataset[T] private[sql]( val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output - val columnMap = colNames.zip(cols).toMap + val columnSeq = colNames.zip(cols) val replacedAndExistingColumns = output.map { field => - columnMap.find { case (colName, _) => + columnSeq.find { case (colName, _) => resolver(field.name, colName) } match { case Some((colName: String, col: Column)) => col.as(colName) @@ -2433,7 +2433,7 @@ class Dataset[T] private[sql]( } } - val newColumns = columnMap.filter { case (colName, col) => + val newColumns = columnSeq.filter { case (colName, col) => !output.exists(f => resolver(f.name, colName)) }.map { case (colName, col) => col.as(colName) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 1b8bb3f..074a517 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1998,6 +1998,23 @@ class DatasetSuite extends QueryTest joined, (1, 1), (2, 2), (3, 3)) } + + test("SPARK-36210: withColumns preserve insertion ordering") { + val df = Seq(1, 2, 3).toDS() + + val colNames = (1 to 10).map(i => s"value${i}") + val cols = (1 to 10).map(i => col("value") + i) + + val inserted = df.withColumns(colNames, cols) + + assert(inserted.columns === "value" +: colNames) + + checkDataset( + inserted.as[(Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)], + (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), + (3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)) + } } case class Bar(a: Int) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org