[ 
https://issues.apache.org/jira/browse/SPARK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15763010#comment-15763010
 ] 

Apache Spark commented on SPARK-18934:
--------------------------------------

User 'junegunn' has created a pull request for this issue:
https://github.com/apache/spark/pull/16347

> Writing to dynamic partitions does not preserve sort order if spill occurs
> --------------------------------------------------------------------------
>
>                 Key: SPARK-18934
>                 URL: https://issues.apache.org/jira/browse/SPARK-18934
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2
>            Reporter: Junegunn Choi
>
> When writing to dynamic partitions, the task sorts the input data by the 
> partition key (also with bucket key if used), so that it can write to one 
> partition at a time using a single writer. And if spill occurs during the 
> process, {{UnsafeSorterSpillMerger}} is used to merge partial sequences of 
> data.
> However, the merge process only considers the partition key, so that the sort 
> order within a partition specified via {{sortWithinPartitions}} or {{SORT 
> BY}} is not preserved.
> We can reproduce the problem on Spark shell. Make sure to start shell in 
> local mode with small driver memory (e.g. 1G) so that spills occur.
> {code}
> // FileFormatWriter
> sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").format("orc").partitionBy("part")
>   .saveAsTable("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> {noformat}
> +-------+----+
> |  value|part|
> +-------+----+
> |      2|   0|
> |8388610|   0|
> |      4|   0|
> |8388612|   0|
> |      6|   0|
> |8388614|   0|
> |      8|   0|
> |8388616|   0|
> |     10|   0|
> |8388618|   0|
> |     12|   0|
> |8388620|   0|
> |     14|   0|
> |8388622|   0|
> |     16|   0|
> |8388624|   0|
> |     18|   0|
> |8388626|   0|
> |     20|   0|
> |8388628|   0|
> +-------+----+
> {noformat}
> We can confirm that the issue using orc dump.
> {noformat}
> > java -jar orc-tools-1.3.0-SNAPSHOT-uber.jar meta -d 
> > part=0/part-r-00000-96c022f0-a173-40cc-b2e5-9d02fed4213e.snappy.orc | head 
> > -20
> {"value":2}
> {"value":8388610}
> {"value":4}
> {"value":8388612}
> {"value":6}
> {"value":8388614}
> {"value":8}
> {"value":8388616}
> {"value":10}
> {"value":8388618}
> {"value":12}
> {"value":8388620}
> {"value":14}
> {"value":8388622}
> {"value":16}
> {"value":8388624}
> {"value":18}
> {"value":8388626}
> {"value":20}
> {"value":8388628}
> {noformat}
> {{SparkHiveDynamicPartitionWriterContainer}} has the same problem.
> {code}
> // Insert into an existing Hive table with dynamic partitions
> //   CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) 
> STORED AS ORC
> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
> sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").insertInto("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> I was able to fix the problem by appending a numeric index column to the 
> sorting key which effectively makes the sort stable. I'll create a pull 
> request on GitHub but since I'm not really familiar with the internals of 
> Spark, I'm not sure if my approach is valid or idiomatic. So please let me 
> know if there are better ways to handle this, or if you want to address the 
> issue differently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to