[ 
https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785350#comment-17785350
 ] 

Dongjoon Hyun commented on SPARK-44512:
---------------------------------------

I reproduced [~leeyc0]'s report like the following.

*APACHE SPARK 3.5.0*
{code}
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), 
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", 
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
r
q
p
{code}

{code}
scala> sql("set spark.sql.optimizer.plannedWrite.enabled=false")
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), 
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", 
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
p
q
r
{code}

*APACHE SPARK 3.4.1*
{code}
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), 
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", 
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
r
q
p
{code}

{code}
scala> sql("set spark.sql.optimizer.plannedWrite.enabled=false")
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), 
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", 
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")
$ cat t/_2=a/*
p
q
r
{code}

*APACHE SPARK 3.3.3*
{code}
scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), 
(2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", 
"_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t")

$ cat t/_2=a/*
p
q
r
{code}

> dataset.sort.select.write.partitionBy sorts wrong column
> --------------------------------------------------------
>
>                 Key: SPARK-44512
>                 URL: https://issues.apache.org/jira/browse/SPARK-44512
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.4.1
>            Reporter: Yiu-Chung Lee
>            Priority: Blocker
>              Labels: correctness
>         Attachments: Test-Details-for-Query-0.png, 
> Test-Details-for-Query-1.png
>
>
> (In this example the dataset is of type Tuple3, and the columns are named _1, 
> _2 and _3)
>  
> I found -then when AQE is enabled,- that the following code does not produce 
> sorted output (.drop() also have the same problem), unless 
> spark.sql.optimizer.plannedWrite.enabled is set to false.
> After further investigation, spark actually sorted wrong column in the 
> following code.
> {{dataset.sort("_1")}}
> {{.select("_2", "_3")}}
> {{.write()}}
> {{.partitionBy("_2")}}
> {{.text("output");}}
>  
> (the following workaround is no longer necessary)
> -However, if I insert an identity mapper between select and write, the output 
> would be sorted as expected.-
> -{{dataset = dataset.sort("_1")}}-
> -{{.select("_2", "_3");}}-
> -{{dataset.map((MapFunction<Row, Row>) row -> row, dataset.encoder())}}-
> -{{.write()}}-
> -{{{}.{}}}{{{}partitionBy("_2"){}}}-
> -{{.text("output")}}-
> Below is the complete code that reproduces the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to