[ https://issues.apache.org/jira/browse/SPARK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15763010#comment-15763010 ]
Apache Spark commented on SPARK-18934: -------------------------------------- User 'junegunn' has created a pull request for this issue: https://github.com/apache/spark/pull/16347 > Writing to dynamic partitions does not preserve sort order if spill occurs > -------------------------------------------------------------------------- > > Key: SPARK-18934 > URL: https://issues.apache.org/jira/browse/SPARK-18934 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.2 > Reporter: Junegunn Choi > > When writing to dynamic partitions, the task sorts the input data by the > partition key (also with bucket key if used), so that it can write to one > partition at a time using a single writer. And if spill occurs during the > process, {{UnsafeSorterSpillMerger}} is used to merge partial sequences of > data. > However, the merge process only considers the partition key, so that the sort > order within a partition specified via {{sortWithinPartitions}} or {{SORT > BY}} is not preserved. > We can reproduce the problem on Spark shell. Make sure to start shell in > local mode with small driver memory (e.g. 1G) so that spills occur. > {code} > // FileFormatWriter > sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2)) > .repartition(1, 'part).sortWithinPartitions("value") > .write.mode("overwrite").format("orc").partitionBy("part") > .saveAsTable("test_sort_within") > spark.read.table("test_sort_within").show > {code} > {noformat} > +-------+----+ > | value|part| > +-------+----+ > | 2| 0| > |8388610| 0| > | 4| 0| > |8388612| 0| > | 6| 0| > |8388614| 0| > | 8| 0| > |8388616| 0| > | 10| 0| > |8388618| 0| > | 12| 0| > |8388620| 0| > | 14| 0| > |8388622| 0| > | 16| 0| > |8388624| 0| > | 18| 0| > |8388626| 0| > | 20| 0| > |8388628| 0| > +-------+----+ > {noformat} > We can confirm that the issue using orc dump. > {noformat} > > java -jar orc-tools-1.3.0-SNAPSHOT-uber.jar meta -d > > part=0/part-r-00000-96c022f0-a173-40cc-b2e5-9d02fed4213e.snappy.orc | head > > -20 > {"value":2} > {"value":8388610} > {"value":4} > {"value":8388612} > {"value":6} > {"value":8388614} > {"value":8} > {"value":8388616} > {"value":10} > {"value":8388618} > {"value":12} > {"value":8388620} > {"value":14} > {"value":8388622} > {"value":16} > {"value":8388624} > {"value":18} > {"value":8388626} > {"value":20} > {"value":8388628} > {noformat} > {{SparkHiveDynamicPartitionWriterContainer}} has the same problem. > {code} > // Insert into an existing Hive table with dynamic partitions > // CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) > STORED AS ORC > spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") > sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2)) > .repartition(1, 'part).sortWithinPartitions("value") > .write.mode("overwrite").insertInto("test_sort_within") > spark.read.table("test_sort_within").show > {code} > I was able to fix the problem by appending a numeric index column to the > sorting key which effectively makes the sort stable. I'll create a pull > request on GitHub but since I'm not really familiar with the internals of > Spark, I'm not sure if my approach is valid or idiomatic. So please let me > know if there are better ways to handle this, or if you want to address the > issue differently. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org