[jira] [Commented] (SPARK-19563) advoid unnecessary sort in FileFormatWriter

2017-09-22 Thread Charles Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176870#comment-16176870
 ] 

Charles Pritchard commented on SPARK-19563:
---

Did this make it into Spark 2.1.1?

> advoid unnecessary sort in FileFormatWriter
> ---
>
> Key: SPARK-19563
> URL: https://issues.apache.org/jira/browse/SPARK-19563
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
> Fix For: 2.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19352) Sorting issues on relatively big datasets

2017-04-10 Thread Charles Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963367#comment-15963367
 ] 

Charles Pritchard commented on SPARK-19352:
---

Does this fix the issue in SPARK-18934 ?

> Sorting issues on relatively big datasets
> -
>
> Key: SPARK-19352
> URL: https://issues.apache.org/jira/browse/SPARK-19352
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: Spark version 2.1.0
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_102
> macOS 10.12.3
>Reporter: Ivan Gozali
>
> _More details, including the script to generate the synthetic dataset 
> (requires pandas and numpy) are in this GitHub gist._
> https://gist.github.com/igozali/d327a85646abe7ab10c2ae479bed431f
> Given a relatively large synthetic time series dataset of various users 
> (4.1GB), when attempting to:
> * partition this dataset by user ID
> * sort the time series data for each user by timestamp
> * write each partition to a single CSV file
> then some files are unsorted in a very specific manner. In one of the 
> supposedly sorted files, the rows looked as follows:
> {code}
> 2014-01-01T00:00:00.000-08:00,-0.07,0.39,-0.39
> 2014-12-31T02:07:30.000-08:00,0.34,-0.62,-0.22
> 2014-01-01T00:00:05.000-08:00,-0.07,-0.52,0.47
> 2014-12-31T02:07:35.000-08:00,-0.15,-0.13,-0.14
> 2014-01-01T00:00:10.000-08:00,-1.31,-1.17,2.24
> 2014-12-31T02:07:40.000-08:00,-1.28,0.88,-0.43
> {code}
> The above is attempted using the following Scala/Spark code:
> {code}
> val inpth = "/tmp/gen_data_3cols_small"
> spark
> .read
> .option("inferSchema", "true")
> .option("header", "true")
> .csv(inpth)
> .repartition($"userId")
> .sortWithinPartitions("timestamp")
> .write
> .partitionBy("userId")
> .option("header", "true")
> .csv(inpth + "_sorted")
> {code}
> This issue is not seen when using a smaller sized dataset by making the time 
> span smaller (354MB, with the same number of columns).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19352) Sorting issues on relatively big datasets

2017-04-10 Thread Charles Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963365#comment-15963365
 ] 

Charles Pritchard commented on SPARK-19352:
---

[~cloud_fan] Yes, Hive relies on sorting optimizations for running map side 
joins. DISTRIBUTE BY and SORT BY can be used to manually output data into 
single sorted files per partition.
Hive will ensure sorting when running INSERT OVERWRITE statements, when a table 
is created with PARTITIONED BY... CLUSTERED BY... SORTED BY ... INTO 1 BUCKETS.

Spark also reads the Hive metastore to detect when files are already sorted, 
and runs optimizations.

> Sorting issues on relatively big datasets
> -
>
> Key: SPARK-19352
> URL: https://issues.apache.org/jira/browse/SPARK-19352
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: Spark version 2.1.0
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_102
> macOS 10.12.3
>Reporter: Ivan Gozali
>
> _More details, including the script to generate the synthetic dataset 
> (requires pandas and numpy) are in this GitHub gist._
> https://gist.github.com/igozali/d327a85646abe7ab10c2ae479bed431f
> Given a relatively large synthetic time series dataset of various users 
> (4.1GB), when attempting to:
> * partition this dataset by user ID
> * sort the time series data for each user by timestamp
> * write each partition to a single CSV file
> then some files are unsorted in a very specific manner. In one of the 
> supposedly sorted files, the rows looked as follows:
> {code}
> 2014-01-01T00:00:00.000-08:00,-0.07,0.39,-0.39
> 2014-12-31T02:07:30.000-08:00,0.34,-0.62,-0.22
> 2014-01-01T00:00:05.000-08:00,-0.07,-0.52,0.47
> 2014-12-31T02:07:35.000-08:00,-0.15,-0.13,-0.14
> 2014-01-01T00:00:10.000-08:00,-1.31,-1.17,2.24
> 2014-12-31T02:07:40.000-08:00,-1.28,0.88,-0.43
> {code}
> The above is attempted using the following Scala/Spark code:
> {code}
> val inpth = "/tmp/gen_data_3cols_small"
> spark
> .read
> .option("inferSchema", "true")
> .option("header", "true")
> .csv(inpth)
> .repartition($"userId")
> .sortWithinPartitions("timestamp")
> .write
> .partitionBy("userId")
> .option("header", "true")
> .csv(inpth + "_sorted")
> {code}
> This issue is not seen when using a smaller sized dataset by making the time 
> span smaller (354MB, with the same number of columns).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18934) Writing to dynamic partitions does not preserve sort order if spill occurs

2017-04-07 Thread Charles Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961390#comment-15961390
 ] 

Charles Pritchard commented on SPARK-18934:
---

Possibly fixed in: https://issues.apache.org/jira/browse/SPARK-19563 appears to 
be out of scope in Spark per comment in 
https://issues.apache.org/jira/browse/SPARK-19352

> Writing to dynamic partitions does not preserve sort order if spill occurs
> --
>
> Key: SPARK-18934
> URL: https://issues.apache.org/jira/browse/SPARK-18934
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Junegunn Choi
>
> When writing to dynamic partitions, the task sorts the input data by the 
> partition key (also with bucket key if used), so that it can write to one 
> partition at a time using a single writer. And if spill occurs during the 
> process, {{UnsafeSorterSpillMerger}} is used to merge partial sequences of 
> data.
> However, the merge process only considers the partition key, so that the sort 
> order within a partition specified via {{sortWithinPartitions}} or {{SORT 
> BY}} is not preserved.
> We can reproduce the problem on Spark shell. Make sure to start shell in 
> local mode with small driver memory (e.g. 1G) so that spills occur.
> {code}
> // FileFormatWriter
> sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").format("orc").partitionBy("part")
>   .saveAsTable("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> {noformat}
> +---++
> |  value|part|
> +---++
> |  2|   0|
> |8388610|   0|
> |  4|   0|
> |8388612|   0|
> |  6|   0|
> |8388614|   0|
> |  8|   0|
> |8388616|   0|
> | 10|   0|
> |8388618|   0|
> | 12|   0|
> |8388620|   0|
> | 14|   0|
> |8388622|   0|
> | 16|   0|
> |8388624|   0|
> | 18|   0|
> |8388626|   0|
> | 20|   0|
> |8388628|   0|
> +---++
> {noformat}
> We can confirm that the issue using orc dump.
> {noformat}
> > java -jar orc-tools-1.3.0-SNAPSHOT-uber.jar meta -d 
> > part=0/part-r-0-96c022f0-a173-40cc-b2e5-9d02fed4213e.snappy.orc | head 
> > -20
> {"value":2}
> {"value":8388610}
> {"value":4}
> {"value":8388612}
> {"value":6}
> {"value":8388614}
> {"value":8}
> {"value":8388616}
> {"value":10}
> {"value":8388618}
> {"value":12}
> {"value":8388620}
> {"value":14}
> {"value":8388622}
> {"value":16}
> {"value":8388624}
> {"value":18}
> {"value":8388626}
> {"value":20}
> {"value":8388628}
> {noformat}
> {{SparkHiveDynamicPartitionWriterContainer}} has the same problem.
> {code}
> // Insert into an existing Hive table with dynamic partitions
> //   CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) 
> STORED AS ORC
> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
> sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").insertInto("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> I was able to fix the problem by appending a numeric index column to the 
> sorting key which effectively makes the sort stable. I'll create a pull 
> request on GitHub but since I'm not really familiar with the internals of 
> Spark, I'm not sure if my approach is valid or idiomatic. So please let me 
> know if there are better ways to handle this, or if you want to address the 
> issue differently.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19352) Sorting issues on relatively big datasets

2017-04-07 Thread Charles Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961389#comment-15961389
 ] 

Charles Pritchard commented on SPARK-19352:
---

[~cloud_fan] Is there something on the roadmap to get that guarantee? We need 
guaranteed sorting from a general performance perspective, but it's also a 
baseline feature of Hive (AKA: "SORT BY") to be able to sort data into a file 
in a partition.

> Sorting issues on relatively big datasets
> -
>
> Key: SPARK-19352
> URL: https://issues.apache.org/jira/browse/SPARK-19352
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: Spark version 2.1.0
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_102
> macOS 10.12.3
>Reporter: Ivan Gozali
>
> _More details, including the script to generate the synthetic dataset 
> (requires pandas and numpy) are in this GitHub gist._
> https://gist.github.com/igozali/d327a85646abe7ab10c2ae479bed431f
> Given a relatively large synthetic time series dataset of various users 
> (4.1GB), when attempting to:
> * partition this dataset by user ID
> * sort the time series data for each user by timestamp
> * write each partition to a single CSV file
> then some files are unsorted in a very specific manner. In one of the 
> supposedly sorted files, the rows looked as follows:
> {code}
> 2014-01-01T00:00:00.000-08:00,-0.07,0.39,-0.39
> 2014-12-31T02:07:30.000-08:00,0.34,-0.62,-0.22
> 2014-01-01T00:00:05.000-08:00,-0.07,-0.52,0.47
> 2014-12-31T02:07:35.000-08:00,-0.15,-0.13,-0.14
> 2014-01-01T00:00:10.000-08:00,-1.31,-1.17,2.24
> 2014-12-31T02:07:40.000-08:00,-1.28,0.88,-0.43
> {code}
> The above is attempted using the following Scala/Spark code:
> {code}
> val inpth = "/tmp/gen_data_3cols_small"
> spark
> .read
> .option("inferSchema", "true")
> .option("header", "true")
> .csv(inpth)
> .repartition($"userId")
> .sortWithinPartitions("timestamp")
> .write
> .partitionBy("userId")
> .option("header", "true")
> .csv(inpth + "_sorted")
> {code}
> This issue is not seen when using a smaller sized dataset by making the time 
> span smaller (354MB, with the same number of columns).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18934) Writing to dynamic partitions does not preserve sort order if spill occurs

2017-01-04 Thread Charles Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15800323#comment-15800323
 ] 

Charles Pritchard commented on SPARK-18934:
---

Use case and other issues may be related to SPARK-15420 - 
https://github.com/apache/spark/pull/13206


> Writing to dynamic partitions does not preserve sort order if spill occurs
> --
>
> Key: SPARK-18934
> URL: https://issues.apache.org/jira/browse/SPARK-18934
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Junegunn Choi
>
> When writing to dynamic partitions, the task sorts the input data by the 
> partition key (also with bucket key if used), so that it can write to one 
> partition at a time using a single writer. And if spill occurs during the 
> process, {{UnsafeSorterSpillMerger}} is used to merge partial sequences of 
> data.
> However, the merge process only considers the partition key, so that the sort 
> order within a partition specified via {{sortWithinPartitions}} or {{SORT 
> BY}} is not preserved.
> We can reproduce the problem on Spark shell. Make sure to start shell in 
> local mode with small driver memory (e.g. 1G) so that spills occur.
> {code}
> // FileFormatWriter
> sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").format("orc").partitionBy("part")
>   .saveAsTable("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> {noformat}
> +---++
> |  value|part|
> +---++
> |  2|   0|
> |8388610|   0|
> |  4|   0|
> |8388612|   0|
> |  6|   0|
> |8388614|   0|
> |  8|   0|
> |8388616|   0|
> | 10|   0|
> |8388618|   0|
> | 12|   0|
> |8388620|   0|
> | 14|   0|
> |8388622|   0|
> | 16|   0|
> |8388624|   0|
> | 18|   0|
> |8388626|   0|
> | 20|   0|
> |8388628|   0|
> +---++
> {noformat}
> We can confirm that the issue using orc dump.
> {noformat}
> > java -jar orc-tools-1.3.0-SNAPSHOT-uber.jar meta -d 
> > part=0/part-r-0-96c022f0-a173-40cc-b2e5-9d02fed4213e.snappy.orc | head 
> > -20
> {"value":2}
> {"value":8388610}
> {"value":4}
> {"value":8388612}
> {"value":6}
> {"value":8388614}
> {"value":8}
> {"value":8388616}
> {"value":10}
> {"value":8388618}
> {"value":12}
> {"value":8388620}
> {"value":14}
> {"value":8388622}
> {"value":16}
> {"value":8388624}
> {"value":18}
> {"value":8388626}
> {"value":20}
> {"value":8388628}
> {noformat}
> {{SparkHiveDynamicPartitionWriterContainer}} has the same problem.
> {code}
> // Insert into an existing Hive table with dynamic partitions
> //   CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) 
> STORED AS ORC
> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
> sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").insertInto("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> I was able to fix the problem by appending a numeric index column to the 
> sorting key which effectively makes the sort stable. I'll create a pull 
> request on GitHub but since I'm not really familiar with the internals of 
> Spark, I'm not sure if my approach is valid or idiomatic. So please let me 
> know if there are better ways to handle this, or if you want to address the 
> issue differently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6593) Provide option for HadoopRDD to skip corrupted files

2016-09-13 Thread Charles Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1543#comment-1543
 ] 

Charles Pritchard commented on SPARK-6593:
--

Something appears to have changed between 2.0 and 1.5.1: in 2.0 I have files 
that will fail with "Unexpected end of input stream" whereas they read with 
1.5.1 without error. Those files also trigger exceptions with command line 
zcat/gzip.

> Provide option for HadoopRDD to skip corrupted files
> 
>
> Key: SPARK-6593
> URL: https://issues.apache.org/jira/browse/SPARK-6593
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Dale Richardson
>Priority: Minor
>
> When reading a large amount of gzip files from HDFS eg. with  
> sc.textFile("hdfs:///user/cloudera/logs*.gz"), If the hadoop input libraries 
> report an exception then the entire job is canceled. As default behaviour 
> this is probably for the best, but it would be nice in some circumstances 
> where you know it will be ok to have the option to skip the corrupted file 
> and continue the job. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14482) Change default compression codec for Parquet from gzip to snappy

2016-09-13 Thread Charles Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488848#comment-15488848
 ] 

Charles Pritchard commented on SPARK-14482:
---

I don't think this fully made it into the manual; this impacts default 
compression for Parquet, but I think the manual still shows the default as gzip.

> Change default compression codec for Parquet from gzip to snappy
> 
>
> Key: SPARK-14482
> URL: https://issues.apache.org/jira/browse/SPARK-14482
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Fix For: 2.0.0
>
>
> Based on our tests, gzip decompression is very slow (< 100MB/s), making 
> queries decompression bound. Snappy can decompress at ~ 500MB/s on a single 
> core.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org