Junegunn Choi created SPARK-18934:
-------------------------------------

             Summary: Writing to dynamic partitions does not preserve sort 
order if spill occurs
                 Key: SPARK-18934
                 URL: https://issues.apache.org/jira/browse/SPARK-18934
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.0.2
            Reporter: Junegunn Choi


When writing to dynamic partitions, the task sorts the input data by the 
partition key (also with bucket key if used), so that it can write to one 
partition at a time using a single writer. And if spill occurs during the 
process, {{UnsafeSorterSpillMerger}} is used to merge partial sequences of data.

However, the merge process only considers the partition key, so that the sort 
order within a partition specified via {{sortWithinPartitions}} or {{SORT BY}} 
is not preserved.

We can reproduce the problem on Spark shell. Make sure to start shell in local 
mode with small driver memory (e.g. 1G) so that spills occur.

{code}
// FileFormatWriter
sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
  .repartition(1, 'part).sortWithinPartitions("value")
  .write.mode("overwrite").format("orc").partitionBy("part")
  .saveAsTable("test_sort_within")

spark.read.table("test_sort_within").show
{code}

{noformat}
+-------+----+
|  value|part|
+-------+----+
|      2|   0|
|8388610|   0|
|      4|   0|
|8388612|   0|
|      6|   0|
|8388614|   0|
|      8|   0|
|8388616|   0|
|     10|   0|
|8388618|   0|
|     12|   0|
|8388620|   0|
|     14|   0|
|8388622|   0|
|     16|   0|
|8388624|   0|
|     18|   0|
|8388626|   0|
|     20|   0|
|8388628|   0|
+-------+----+
{noformat}

We can confirm that the issue using orc dump.

{noformat}
> java -jar orc-tools-1.3.0-SNAPSHOT-uber.jar meta -d 
> part=0/part-r-00000-96c022f0-a173-40cc-b2e5-9d02fed4213e.snappy.orc | head -20

{"value":2}
{"value":8388610}
{"value":4}
{"value":8388612}
{"value":6}
{"value":8388614}
{"value":8}
{"value":8388616}
{"value":10}
{"value":8388618}
{"value":12}
{"value":8388620}
{"value":14}
{"value":8388622}
{"value":16}
{"value":8388624}
{"value":18}
{"value":8388626}
{"value":20}
{"value":8388628}
{noformat}

{{SparkHiveDynamicPartitionWriterContainer}} has the same problem.

{code}
// Insert into an existing Hive table with dynamic partitions
//   CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) STORED 
AS ORC
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
  .repartition(1, 'part).sortWithinPartitions("value")
  .write.mode("overwrite").insertInto("test_sort_within")

spark.read.table("test_sort_within").show
{code}

I was able to fix the problem by appending a numeric index column to the 
sorting key which effectively makes the sort stable. I'll create a pull request 
on GitHub but since I'm not really familiar with the internals of Spark, I'm 
not sure if my approach is valid or idiomatic. So please let me know if there 
are better ways to handle this, or if you want to address the issue differently.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to