[ 
https://issues.apache.org/jira/browse/SPARK-19352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15963365#comment-15963365
 ] 

Charles Pritchard commented on SPARK-19352:
-------------------------------------------

[~cloud_fan] Yes, Hive relies on sorting optimizations for running map side 
joins. DISTRIBUTE BY and SORT BY can be used to manually output data into 
single sorted files per partition.
Hive will ensure sorting when running INSERT OVERWRITE statements, when a table 
is created with PARTITIONED BY... CLUSTERED BY... SORTED BY ... INTO 1 BUCKETS.

Spark also reads the Hive metastore to detect when files are already sorted, 
and runs optimizations.

> Sorting issues on relatively big datasets
> -----------------------------------------
>
>                 Key: SPARK-19352
>                 URL: https://issues.apache.org/jira/browse/SPARK-19352
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>         Environment: Spark version 2.1.0
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_102
> macOS 10.12.3
>            Reporter: Ivan Gozali
>
> _More details, including the script to generate the synthetic dataset 
> (requires pandas and numpy) are in this GitHub gist._
> https://gist.github.com/igozali/d327a85646abe7ab10c2ae479bed431f
> Given a relatively large synthetic time series dataset of various users 
> (4.1GB), when attempting to:
> * partition this dataset by user ID
> * sort the time series data for each user by timestamp
> * write each partition to a single CSV file
> then some files are unsorted in a very specific manner. In one of the 
> supposedly sorted files, the rows looked as follows:
> {code}
> 2014-01-01T00:00:00.000-08:00,-0.07,0.39,-0.39
> 2014-12-31T02:07:30.000-08:00,0.34,-0.62,-0.22
> 2014-01-01T00:00:05.000-08:00,-0.07,-0.52,0.47
> 2014-12-31T02:07:35.000-08:00,-0.15,-0.13,-0.14
> 2014-01-01T00:00:10.000-08:00,-1.31,-1.17,2.24
> 2014-12-31T02:07:40.000-08:00,-1.28,0.88,-0.43
> {code}
> The above is attempted using the following Scala/Spark code:
> {code}
> val inpth = "/tmp/gen_data_3cols_small"
> spark
>     .read
>     .option("inferSchema", "true")
>     .option("header", "true")
>     .csv(inpth)
>     .repartition($"userId")
>     .sortWithinPartitions("timestamp")
>     .write
>     .partitionBy("userId")
>     .option("header", "true")
>     .csv(inpth + "_sorted")
> {code}
> This issue is not seen when using a smaller sized dataset by making the time 
> span smaller (354MB, with the same number of columns).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to