[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785350#comment-17785350 ]
Dongjoon Hyun commented on SPARK-44512: --------------------------------------- I reproduced [~leeyc0]'s report like the following. *APACHE SPARK 3.5.0* {code} scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* r q p {code} {code} scala> sql("set spark.sql.optimizer.plannedWrite.enabled=false") scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* p q r {code} *APACHE SPARK 3.4.1* {code} scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* r q p {code} {code} scala> sql("set spark.sql.optimizer.plannedWrite.enabled=false") scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* p q r {code} *APACHE SPARK 3.3.3* {code} scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* p q r {code} > dataset.sort.select.write.partitionBy sorts wrong column > -------------------------------------------------------- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Affects Versions: 3.4.1 > Reporter: Yiu-Chung Lee > Priority: Blocker > Labels: correctness > Attachments: Test-Details-for-Query-0.png, > Test-Details-for-Query-1.png > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found -then when AQE is enabled,- that the following code does not produce > sorted output (.drop() also have the same problem), unless > spark.sql.optimizer.plannedWrite.enabled is set to false. > After further investigation, spark actually sorted wrong column in the > following code. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > (the following workaround is no longer necessary) > -However, if I insert an identity mapper between select and write, the output > would be sorted as expected.- > -{{dataset = dataset.sort("_1")}}- > -{{.select("_2", "_3");}}- > -{{dataset.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}- > -{{.write()}}- > -{{{}.{}}}{{{}partitionBy("_2"){}}}- > -{{.text("output")}}- > Below is the complete code that reproduces the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org