[jira] [Assigned] (SPARK-18934) Writing to dynamic partitions does not preserve sort order if spill occurs

2016-12-19 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18934:


Assignee: (was: Apache Spark)

> Writing to dynamic partitions does not preserve sort order if spill occurs
> --
>
> Key: SPARK-18934
> URL: https://issues.apache.org/jira/browse/SPARK-18934
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Junegunn Choi
>
> When writing to dynamic partitions, the task sorts the input data by the 
> partition key (also with bucket key if used), so that it can write to one 
> partition at a time using a single writer. And if spill occurs during the 
> process, {{UnsafeSorterSpillMerger}} is used to merge partial sequences of 
> data.
> However, the merge process only considers the partition key, so that the sort 
> order within a partition specified via {{sortWithinPartitions}} or {{SORT 
> BY}} is not preserved.
> We can reproduce the problem on Spark shell. Make sure to start shell in 
> local mode with small driver memory (e.g. 1G) so that spills occur.
> {code}
> // FileFormatWriter
> sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").format("orc").partitionBy("part")
>   .saveAsTable("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> {noformat}
> +---++
> |  value|part|
> +---++
> |  2|   0|
> |8388610|   0|
> |  4|   0|
> |8388612|   0|
> |  6|   0|
> |8388614|   0|
> |  8|   0|
> |8388616|   0|
> | 10|   0|
> |8388618|   0|
> | 12|   0|
> |8388620|   0|
> | 14|   0|
> |8388622|   0|
> | 16|   0|
> |8388624|   0|
> | 18|   0|
> |8388626|   0|
> | 20|   0|
> |8388628|   0|
> +---++
> {noformat}
> We can confirm that the issue using orc dump.
> {noformat}
> > java -jar orc-tools-1.3.0-SNAPSHOT-uber.jar meta -d 
> > part=0/part-r-0-96c022f0-a173-40cc-b2e5-9d02fed4213e.snappy.orc | head 
> > -20
> {"value":2}
> {"value":8388610}
> {"value":4}
> {"value":8388612}
> {"value":6}
> {"value":8388614}
> {"value":8}
> {"value":8388616}
> {"value":10}
> {"value":8388618}
> {"value":12}
> {"value":8388620}
> {"value":14}
> {"value":8388622}
> {"value":16}
> {"value":8388624}
> {"value":18}
> {"value":8388626}
> {"value":20}
> {"value":8388628}
> {noformat}
> {{SparkHiveDynamicPartitionWriterContainer}} has the same problem.
> {code}
> // Insert into an existing Hive table with dynamic partitions
> //   CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) 
> STORED AS ORC
> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
> sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").insertInto("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> I was able to fix the problem by appending a numeric index column to the 
> sorting key which effectively makes the sort stable. I'll create a pull 
> request on GitHub but since I'm not really familiar with the internals of 
> Spark, I'm not sure if my approach is valid or idiomatic. So please let me 
> know if there are better ways to handle this, or if you want to address the 
> issue differently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18934) Writing to dynamic partitions does not preserve sort order if spill occurs

2016-12-19 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18934:


Assignee: Apache Spark

> Writing to dynamic partitions does not preserve sort order if spill occurs
> --
>
> Key: SPARK-18934
> URL: https://issues.apache.org/jira/browse/SPARK-18934
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Junegunn Choi
>Assignee: Apache Spark
>
> When writing to dynamic partitions, the task sorts the input data by the 
> partition key (also with bucket key if used), so that it can write to one 
> partition at a time using a single writer. And if spill occurs during the 
> process, {{UnsafeSorterSpillMerger}} is used to merge partial sequences of 
> data.
> However, the merge process only considers the partition key, so that the sort 
> order within a partition specified via {{sortWithinPartitions}} or {{SORT 
> BY}} is not preserved.
> We can reproduce the problem on Spark shell. Make sure to start shell in 
> local mode with small driver memory (e.g. 1G) so that spills occur.
> {code}
> // FileFormatWriter
> sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").format("orc").partitionBy("part")
>   .saveAsTable("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> {noformat}
> +---++
> |  value|part|
> +---++
> |  2|   0|
> |8388610|   0|
> |  4|   0|
> |8388612|   0|
> |  6|   0|
> |8388614|   0|
> |  8|   0|
> |8388616|   0|
> | 10|   0|
> |8388618|   0|
> | 12|   0|
> |8388620|   0|
> | 14|   0|
> |8388622|   0|
> | 16|   0|
> |8388624|   0|
> | 18|   0|
> |8388626|   0|
> | 20|   0|
> |8388628|   0|
> +---++
> {noformat}
> We can confirm that the issue using orc dump.
> {noformat}
> > java -jar orc-tools-1.3.0-SNAPSHOT-uber.jar meta -d 
> > part=0/part-r-0-96c022f0-a173-40cc-b2e5-9d02fed4213e.snappy.orc | head 
> > -20
> {"value":2}
> {"value":8388610}
> {"value":4}
> {"value":8388612}
> {"value":6}
> {"value":8388614}
> {"value":8}
> {"value":8388616}
> {"value":10}
> {"value":8388618}
> {"value":12}
> {"value":8388620}
> {"value":14}
> {"value":8388622}
> {"value":16}
> {"value":8388624}
> {"value":18}
> {"value":8388626}
> {"value":20}
> {"value":8388628}
> {noformat}
> {{SparkHiveDynamicPartitionWriterContainer}} has the same problem.
> {code}
> // Insert into an existing Hive table with dynamic partitions
> //   CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) 
> STORED AS ORC
> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
> sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").insertInto("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> I was able to fix the problem by appending a numeric index column to the 
> sorting key which effectively makes the sort stable. I'll create a pull 
> request on GitHub but since I'm not really familiar with the internals of 
> Spark, I'm not sure if my approach is valid or idiomatic. So please let me 
> know if there are better ways to handle this, or if you want to address the 
> issue differently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org