[jira] [Commented] (SPARK-44512) dataset.sort.select.write.partitionBy sorts wrong column
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746849#comment-17746849 ] Yiu-Chung Lee commented on SPARK-44512: --- bumping to blocker because I believe this is a potentially very serious issue in the query planner, which may affect other queries > dataset.sort.select.write.partitionBy sorts wrong column > > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Blocker > Labels: correctness > Attachments: Test-Details-for-Query-0.png, > Test-Details-for-Query-1.png > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found -then when AQE is enabled,- that the following code does not produce > sorted output (.drop() also have the same problem), unless > spark.sql.optimizer.plannedWrite.enabled is set to false. > After further investigation, spark actually sorted wrong column in the > following code. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > (the following workaround is no longer necessary) > -However, if I insert an identity mapper between select and write, the output > would be sorted as expected.- > -{{dataset = dataset.sort("_1")}}- > -{{.select("_2", "_3");}}- > -{{dataset.map((MapFunction) row -> row, dataset.encoder())}}- > -{{.write()}}- > -{{{}.{}}}{{{}partitionBy("_2"){}}}- > -{{.text("output")}}- > Below is the complete code that reproduces the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44512) dataset.sort.select.write.partitionBy sorts wrong column
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781169#comment-17781169 ] Dongjoon Hyun commented on SPARK-44512: --- Hi, All. Is this correctness issue valid in branch-3.4/3.5/master? Let me set the `Target Version` to 3.4.2 not to forget. > dataset.sort.select.write.partitionBy sorts wrong column > > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Blocker > Labels: correctness > Attachments: Test-Details-for-Query-0.png, > Test-Details-for-Query-1.png > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found -then when AQE is enabled,- that the following code does not produce > sorted output (.drop() also have the same problem), unless > spark.sql.optimizer.plannedWrite.enabled is set to false. > After further investigation, spark actually sorted wrong column in the > following code. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > (the following workaround is no longer necessary) > -However, if I insert an identity mapper between select and write, the output > would be sorted as expected.- > -{{dataset = dataset.sort("_1")}}- > -{{.select("_2", "_3");}}- > -{{dataset.map((MapFunction) row -> row, dataset.encoder())}}- > -{{.write()}}- > -{{{}.{}}}{{{}partitionBy("_2"){}}}- > -{{.text("output")}}- > Below is the complete code that reproduces the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44512) dataset.sort.select.write.partitionBy sorts wrong column
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785350#comment-17785350 ] Dongjoon Hyun commented on SPARK-44512: --- I reproduced [~leeyc0]'s report like the following. *APACHE SPARK 3.5.0* {code} scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* r q p {code} {code} scala> sql("set spark.sql.optimizer.plannedWrite.enabled=false") scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* p q r {code} *APACHE SPARK 3.4.1* {code} scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* r q p {code} {code} scala> sql("set spark.sql.optimizer.plannedWrite.enabled=false") scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* p q r {code} *APACHE SPARK 3.3.3* {code} scala> spark.createDataset(Seq((3L, "a", "r"), (3L, "b", "r"), (2L, "b", "q"), (2L, "a", "q"), (1L, "a", "p"), (1L, "b", "p"))).sort("_1").select("_2", "_3").write.mode("overwrite").partitionBy("_2").text("/tmp/t") $ cat t/_2=a/* p q r {code} > dataset.sort.select.write.partitionBy sorts wrong column > > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Blocker > Labels: correctness > Attachments: Test-Details-for-Query-0.png, > Test-Details-for-Query-1.png > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found -then when AQE is enabled,- that the following code does not produce > sorted output (.drop() also have the same problem), unless > spark.sql.optimizer.plannedWrite.enabled is set to false. > After further investigation, spark actually sorted wrong column in the > following code. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > (the following workaround is no longer necessary) > -However, if I insert an identity mapper between select and write, the output > would be sorted as expected.- > -{{dataset = dataset.sort("_1")}}- > -{{.select("_2", "_3");}}- > -{{dataset.map((MapFunction) row -> row, dataset.encoder())}}- > -{{.write()}}- > -{{{}.{}}}{{{}partitionBy("_2"){}}}- > -{{.text("output")}}- > Below is the complete code that reproduces the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44512) dataset.sort.select.write.partitionBy sorts wrong column
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785352#comment-17785352 ] Dongjoon Hyun commented on SPARK-44512: --- In general, I believe that the users report this as an issue because the target sorting column is already removed by `.select("_2", "_3")` before `.write.mode("overwrite")`. Although I understand Apache Spark 3.4.0 changes the behavior like the above, I don't think there is a contract that Apache Spark's `partitionBy` operation preserves the previous ordering . So, let me close this issue as `Not A Problem`. > dataset.sort.select.write.partitionBy sorts wrong column > > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Blocker > Labels: correctness > Attachments: Test-Details-for-Query-0.png, > Test-Details-for-Query-1.png > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found -then when AQE is enabled,- that the following code does not produce > sorted output (.drop() also have the same problem), unless > spark.sql.optimizer.plannedWrite.enabled is set to false. > After further investigation, spark actually sorted wrong column in the > following code. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > (the following workaround is no longer necessary) > -However, if I insert an identity mapper between select and write, the output > would be sorted as expected.- > -{{dataset = dataset.sort("_1")}}- > -{{.select("_2", "_3");}}- > -{{dataset.map((MapFunction) row -> row, dataset.encoder())}}- > -{{.write()}}- > -{{{}.{}}}{{{}partitionBy("_2"){}}}- > -{{.text("output")}}- > Below is the complete code that reproduces the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44512) dataset.sort.select.write.partitionBy sorts wrong column
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785356#comment-17785356 ] Yiu-Chung Lee commented on SPARK-44512: --- [~dongjoon] As I mentioned before, I need to preserve the sorting order by _1 before writing into file. If partitionBy does not have such contract what would be your recommendation? > dataset.sort.select.write.partitionBy sorts wrong column > > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Blocker > Attachments: Test-Details-for-Query-0.png, > Test-Details-for-Query-1.png > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found -then when AQE is enabled,- that the following code does not produce > sorted output (.drop() also have the same problem), unless > spark.sql.optimizer.plannedWrite.enabled is set to false. > After further investigation, spark actually sorted wrong column in the > following code. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > (the following workaround is no longer necessary) > -However, if I insert an identity mapper between select and write, the output > would be sorted as expected.- > -{{dataset = dataset.sort("_1")}}- > -{{.select("_2", "_3");}}- > -{{dataset.map((MapFunction) row -> row, dataset.encoder())}}- > -{{.write()}}- > -{{{}.{}}}{{{}partitionBy("_2"){}}}- > -{{.text("output")}}- > Below is the complete code that reproduces the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org