[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-06-19 Thread junegunn
Github user junegunn commented on the issue:

https://github.com/apache/spark/pull/16347
  
Hive makes sure that the output file is properly sorted by the column 
specified in `SORT BY` clause by having only one reduce task (output) for each 
partition.

```
STAGE PLANS:
  Stage: Stage-1
Map Reduce
  Map Operator Tree:
  TableScan
alias: __
Statistics: Num rows: 183663543 Data size: 313697356092 Basic 
stats: COMPLETE Column stats: PARTIAL
Select Operator
  expressions: __ (type: bigint),  (type: string), ___ 
(type: string), _ (type: string), ___ (type: string), 
___ (type: string), _ (type: string),  (type: string), 
__ (type: string), _ (type: string),  (type: string), 
___ (type: string), _ (type: string), _ 
(type: string),  (type: string), __ (type: string), _ 
(type: string), __ (type: string), ___ (type: string)
  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, 
_col16, _col17, _col18
  Statistics: Num rows: 183663543 Data size: 313697356092 Basic 
stats: COMPLETE Column stats: PARTIAL
  Reduce Output Operator
key expressions: _col0 (type: bigint)
sort order: +
Map-reduce partition columns: _col18 (type: string)
Statistics: Num rows: 183663543 Data size: 313697356092 
Basic stats: COMPLETE Column stats: PARTIAL
value expressions: _col1 (type: string), _col2 (type: 
string), _col3 (type: string), _col4 (type: string), _col5 (type: string), 
_col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: 
string), _col10 (type: string), _col11 (type: string), _col12 (type: string), 
_col13 (type: string), _col14 (type: string), _col15 (type: string), _col16 
(type: string), _col17 (type: string), _col18 (type: string)
  Execution mode: vectorized
  Reduce Operator Tree:
Select Operator
  expressions: KEY.reducesinkkey0 (type: bigint), VALUE._col0 
(type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), 
VALUE._col3 (type: string), VALUE._col4 (type: string), VALUE._col5 (type: 
string), VALUE._col6 (type: string), VALUE._col7 (type: string), VALUE._col8 
(type: string), VALUE._col9 (type: string), VALUE._col10 (type: string), 
VALUE._col11 (type: string), VALUE._col12 (type: string), VALUE._col13 (type: 
string), VALUE._col14 (type: string), VALUE._col15 (type: string), VALUE._col16 
(type: string), VALUE._col17 (type: string)
  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, 
_col16, _col17, _col18
  Statistics: Num rows: 183663543 Data size: 33794091912 Basic 
stats: COMPLETE Column stats: PARTIAL
  File Output Operator
compressed: false
Statistics: Num rows: 183663543 Data size: 33794091912 Basic 
stats: COMPLETE Column stats: PARTIAL
table:
input format: 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
output format: 
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: ___.
```

The later stage simply moves the files to the corresponding directories.

Since the patch no longer merges and I think I have made my point, I'm 
closing this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/16347
  
gentle ping @junegunn on ^.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-05-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/16347
  
@junegunn Can you check the query plan of hive for `INSERT OVERWRITE TABLE 
... DISTRIBUTE BY ... SORT BY ...`?

In Spark SQL, the query plan looks like
```
'InsertIntoTable  `table1`
+- 'Sort ['j ASC NULLS FIRST], false
   +- 'RepartitionByExpression ['i], 200
  +- table
```

The sort happens before the insertion, this explains why Spark doesn't 
preserve the sort order. If in hive the sort is included in insertion, we 
should follow that.

In the meanwhile, the behavior of DataFrameWriter looks reasonable(need to 
fix the document of `sortWithinPartitions `), but we should add new API to 
allow users specify sort ordering during writing, to be consistent with SQL API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-05-23 Thread junegunn
Github user junegunn commented on the issue:

https://github.com/apache/spark/pull/16347
  
See my answer above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-05-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/16347
  
is this still needed after https://github.com/apache/spark/pull/16898 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-04-11 Thread junegunn
Github user junegunn commented on the issue:

https://github.com/apache/spark/pull/16347
  
@cloud-fan It's not a problem in the context of DataFrame API. But when it 
comes to Spark SQL, it makes Spark SQL incompatible to equivalent HiveQL in a 
subtle way. At least we may need to revisit the documentation that gives false 
impression that `sortWithinPartitions` and `SORT BY` of HiveQL are equivalent.


https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L990


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-04-11 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/16347
  
@junegunn I think it's not a problem, `df.write.xxx` is not guaranteed to 
retain the ordering of `df` when writing data output.

Currently the `DataFrameWriter` doesn't provide an interface to specify the 
ordering of the data written, if you do wanna guarantee the ordering of output, 
please use bucketed table in Spark SQL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-04-11 Thread junegunn
Github user junegunn commented on the issue:

https://github.com/apache/spark/pull/16347
  
@cloud-fan Unfortunately, yes.

```scala
sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2))
  .repartition(1, 'part).sortWithinPartitions("value")
  .write.mode("overwrite").format("orc").partitionBy("part")
  .saveAsTable("test_sort_within")
```

For the above case, `requiredOrdering` is `part` and `actualOrdering` is 
`value`, so SortExec runs anyway and the ordering within the partition is not 
respected if spill occurs.

However, we now have a workaround; prepend partition columns to 
`sortWithinPartitions` call or `SORT BY` clause, i.e. 
`.sortWithinPartitions("part", "value")`, to bypass SortExec.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-04-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/16347
  
is this still a problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-04-03 Thread Downchuck
Github user Downchuck commented on the issue:

https://github.com/apache/spark/pull/16347
  
Is there anyone on the Spark team taking this up? This bug is painful; it's 
saddened a hundred TB of data I stacked up, and I'm really trying to avoid more 
manual work. "INSERT OVERWRITE TABLE ... DISTRIBUTE BY ... SORT BY" is how I 
live my life these days.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-01-19 Thread junegunn
Github user junegunn commented on the issue:

https://github.com/apache/spark/pull/16347
  
Rebased to current master. The patch is simpler thanks to the refactoring 
made in [SPARK-18243](https://issues.apache.org/jira/browse/SPARK-18243).

Anyway, I can understand your rationale for wanting to have explicit API on 
the writer side, but then make sure that the sort specification from 
`sortWithinPartitions` is automatically propagated to the writer, or the method 
is no longer compatible to `SORT BY` in Hive and [the 
documentation](https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L990)
 should be corrected accordingly.  Care should be taken for `INSERT OVERWRITE 
TABLE ... DISTIRBUTE BY ... SORT BY ...` statement in Spark SQL so that it's 
compatible to the same Hive SQL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-01-06 Thread chpritchard-expedia
Github user chpritchard-expedia commented on the issue:

https://github.com/apache/spark/pull/16347
  
@rxin - Oh, yes that'd be fantastic, partitionBy.sortBy is just about all I 
need to survive in this crazy world. In the meantime, I think there ought to be 
a big warning label on partitionBy in the Spark docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-01-05 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/16347
  
What I was suggesting was to allow sort by without bucketing.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-01-05 Thread chpritchard-expedia
Github user chpritchard-expedia commented on the issue:

https://github.com/apache/spark/pull/16347
  
@rxin - sortBy is somewhat tied in with bucketing, which is also a little 
difficult to work with. First, bucketing often relies on a column being 
present, whereas in Hive (and with repartition), I may use a formula, to split 
the data into appropriate buckets that are evenly distributed.

Overall is not well supported throughout the ecosystem. Even with all of 
that, Spark doesn't particularly support semantics to say that a data set is 
already sorted.

In Hive, I've had to do a lot of PARTITION BY(datefield, bucket) CLUSTERED 
BY (key) INTO 1 SORTED BY (key) INTO 1 BUCKETS. That gets us stable totally 
sorted files, for GUIDs.

In the case of Spark, this issue of partitionBy destroying sorting is a 
painful bug. I'm now using some very large data sets, searching for keys, and 
instead of returning data in a few seconds (thanks to predicate pushdown with 
Parquet), it has to scan the entire files.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-01-04 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/16347
  
Maybe we should make DataFrameWriter.sortBy work here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-01-04 Thread junegunn
Github user junegunn commented on the issue:

https://github.com/apache/spark/pull/16347
  
@chpritchard-expedia The patch here fixes the problem. I don't think it's 
possible to workaround the issue by using Spark API in some different ways, 
because we can't completely avoid memory spills at the writers. Hive doesn't 
have the problem, so maybe you can consider running the same statement on Hive 
if this is not something Spark wants to address.

Anyway, for anyone who's interested, I could confirm that for a sorted ORC 
table built with this patch, point/range lookups on the sorted column can be 
several times faster. Also the final size of the table turned out to be 
significantly smaller in this case (60% of unsorted table) due to the temporal 
locality in our data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2017-01-04 Thread chpritchard-expedia
Github user chpritchard-expedia commented on the issue:

https://github.com/apache/spark/pull/16347
  
@junegunn I ran into the same issue, using partitionBy; missed it 
completely during my testing. Would you share the workaround you used? I wasn't 
able to understand it from your Apache JIRA posting. At present I'm thinking 
about using mapPartitions and writing each partition out from there.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2016-12-20 Thread junegunn
Github user junegunn commented on the issue:

https://github.com/apache/spark/pull/16347
  
Thanks for the comment. I was trying to implement the following Hive QL in 
Spark SQL/API:

```sql
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.mapred.mode = nonstrict;

insert overwrite table target_table
partition (day)
select * from source_table
distribute by day sort by id;
```

In Hive, `distribute by day` ensures that the records with the same "day" 
goes to the same reducer, and `sort by id` ensures that the input to each 
reducer is sorted by "id". It works as expected. The number of reducers is no 
more than the cardinality of "day" column, and I could confirm that the 
generated ORC file in each partition is sorted by "id".

However, if I run the same query or its equivalent Spark code – 
[`repartition('day)` for `distribute by 
day`](https://github.com/apache/spark/blob/bfeccd80ef032cab3525037be3d3e42519619493/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2423),
 and [`sortWithinPartitions('id)` for `sort by 
id`](https://github.com/apache/spark/blob/bfeccd80ef032cab3525037be3d3e42519619493/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L990)
 – on Spark, we have the right number of writer tasks, one for each 
partition, and each task generates a single output file, but the generated ORC 
file is not properly sorted by "id" making ORC index ineffective.

> Can your use case be satisfied by adding an explicit sortBy?

`sortBy` is for bucketed tables and requires `bucketBy`, so I'm not sure if 
it's related to this issue regarding Hive compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2016-12-20 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/16347
  
Thanks for submitting the ticket. In general I don't think the 
sortWithinPartitions property can carry over to writing out data, because one 
partition actually corresponds to more than one file.

Can your use case be satisfied by adding an explicit sortBy?

```
df.write.sortBy(col).parquet(...)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...

2016-12-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16347
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org