[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user junegunn commented on the issue: https://github.com/apache/spark/pull/16347 Hive makes sure that the output file is properly sorted by the column specified in `SORT BY` clause by having only one reduce task (output) for each partition. ``` STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: __ Statistics: Num rows: 183663543 Data size: 313697356092 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: __ (type: bigint), (type: string), ___ (type: string), _ (type: string), ___ (type: string), ___ (type: string), _ (type: string), (type: string), __ (type: string), _ (type: string), (type: string), ___ (type: string), _ (type: string), _ (type: string), (type: string), __ (type: string), _ (type: string), __ (type: string), ___ (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18 Statistics: Num rows: 183663543 Data size: 313697356092 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: bigint) sort order: + Map-reduce partition columns: _col18 (type: string) Statistics: Num rows: 183663543 Data size: 313697356092 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: string), _col12 (type: string), _col13 (type: string), _col14 (type: string), _col15 (type: string), _col16 (type: string), _col17 (type: string), _col18 (type: string) Execution mode: vectorized Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: bigint), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), VALUE._col3 (type: string), VALUE._col4 (type: string), VALUE._col5 (type: string), VALUE._col6 (type: string), VALUE._col7 (type: string), VALUE._col8 (type: string), VALUE._col9 (type: string), VALUE._col10 (type: string), VALUE._col11 (type: string), VALUE._col12 (type: string), VALUE._col13 (type: string), VALUE._col14 (type: string), VALUE._col15 (type: string), VALUE._col16 (type: string), VALUE._col17 (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18 Statistics: Num rows: 183663543 Data size: 33794091912 Basic stats: COMPLETE Column stats: PARTIAL File Output Operator compressed: false Statistics: Num rows: 183663543 Data size: 33794091912 Basic stats: COMPLETE Column stats: PARTIAL table: input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: ___. ``` The later stage simply moves the files to the corresponding directories. Since the patch no longer merges and I think I have made my point, I'm closing this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/16347 gentle ping @junegunn on ^. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/16347 @junegunn Can you check the query plan of hive for `INSERT OVERWRITE TABLE ... DISTRIBUTE BY ... SORT BY ...`? In Spark SQL, the query plan looks like ``` 'InsertIntoTable `table1` +- 'Sort ['j ASC NULLS FIRST], false +- 'RepartitionByExpression ['i], 200 +- table ``` The sort happens before the insertion, this explains why Spark doesn't preserve the sort order. If in hive the sort is included in insertion, we should follow that. In the meanwhile, the behavior of DataFrameWriter looks reasonable(need to fix the document of `sortWithinPartitions `), but we should add new API to allow users specify sort ordering during writing, to be consistent with SQL API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user junegunn commented on the issue: https://github.com/apache/spark/pull/16347 See my answer above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/16347 is this still needed after https://github.com/apache/spark/pull/16898 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user junegunn commented on the issue: https://github.com/apache/spark/pull/16347 @cloud-fan It's not a problem in the context of DataFrame API. But when it comes to Spark SQL, it makes Spark SQL incompatible to equivalent HiveQL in a subtle way. At least we may need to revisit the documentation that gives false impression that `sortWithinPartitions` and `SORT BY` of HiveQL are equivalent. https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L990 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/16347 @junegunn I think it's not a problem, `df.write.xxx` is not guaranteed to retain the ordering of `df` when writing data output. Currently the `DataFrameWriter` doesn't provide an interface to specify the ordering of the data written, if you do wanna guarantee the ordering of output, please use bucketed table in Spark SQL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user junegunn commented on the issue: https://github.com/apache/spark/pull/16347 @cloud-fan Unfortunately, yes. ```scala sc.parallelize(1 to 1000).toDS.withColumn("part", 'value.mod(2)) .repartition(1, 'part).sortWithinPartitions("value") .write.mode("overwrite").format("orc").partitionBy("part") .saveAsTable("test_sort_within") ``` For the above case, `requiredOrdering` is `part` and `actualOrdering` is `value`, so SortExec runs anyway and the ordering within the partition is not respected if spill occurs. However, we now have a workaround; prepend partition columns to `sortWithinPartitions` call or `SORT BY` clause, i.e. `.sortWithinPartitions("part", "value")`, to bypass SortExec. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/16347 is this still a problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user Downchuck commented on the issue: https://github.com/apache/spark/pull/16347 Is there anyone on the Spark team taking this up? This bug is painful; it's saddened a hundred TB of data I stacked up, and I'm really trying to avoid more manual work. "INSERT OVERWRITE TABLE ... DISTRIBUTE BY ... SORT BY" is how I live my life these days. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user junegunn commented on the issue: https://github.com/apache/spark/pull/16347 Rebased to current master. The patch is simpler thanks to the refactoring made in [SPARK-18243](https://issues.apache.org/jira/browse/SPARK-18243). Anyway, I can understand your rationale for wanting to have explicit API on the writer side, but then make sure that the sort specification from `sortWithinPartitions` is automatically propagated to the writer, or the method is no longer compatible to `SORT BY` in Hive and [the documentation](https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L990) should be corrected accordingly. Care should be taken for `INSERT OVERWRITE TABLE ... DISTIRBUTE BY ... SORT BY ...` statement in Spark SQL so that it's compatible to the same Hive SQL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user chpritchard-expedia commented on the issue: https://github.com/apache/spark/pull/16347 @rxin - Oh, yes that'd be fantastic, partitionBy.sortBy is just about all I need to survive in this crazy world. In the meantime, I think there ought to be a big warning label on partitionBy in the Spark docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/16347 What I was suggesting was to allow sort by without bucketing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user chpritchard-expedia commented on the issue: https://github.com/apache/spark/pull/16347 @rxin - sortBy is somewhat tied in with bucketing, which is also a little difficult to work with. First, bucketing often relies on a column being present, whereas in Hive (and with repartition), I may use a formula, to split the data into appropriate buckets that are evenly distributed. Overall is not well supported throughout the ecosystem. Even with all of that, Spark doesn't particularly support semantics to say that a data set is already sorted. In Hive, I've had to do a lot of PARTITION BY(datefield, bucket) CLUSTERED BY (key) INTO 1 SORTED BY (key) INTO 1 BUCKETS. That gets us stable totally sorted files, for GUIDs. In the case of Spark, this issue of partitionBy destroying sorting is a painful bug. I'm now using some very large data sets, searching for keys, and instead of returning data in a few seconds (thanks to predicate pushdown with Parquet), it has to scan the entire files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/16347 Maybe we should make DataFrameWriter.sortBy work here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user junegunn commented on the issue: https://github.com/apache/spark/pull/16347 @chpritchard-expedia The patch here fixes the problem. I don't think it's possible to workaround the issue by using Spark API in some different ways, because we can't completely avoid memory spills at the writers. Hive doesn't have the problem, so maybe you can consider running the same statement on Hive if this is not something Spark wants to address. Anyway, for anyone who's interested, I could confirm that for a sorted ORC table built with this patch, point/range lookups on the sorted column can be several times faster. Also the final size of the table turned out to be significantly smaller in this case (60% of unsorted table) due to the temporal locality in our data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user chpritchard-expedia commented on the issue: https://github.com/apache/spark/pull/16347 @junegunn I ran into the same issue, using partitionBy; missed it completely during my testing. Would you share the workaround you used? I wasn't able to understand it from your Apache JIRA posting. At present I'm thinking about using mapPartitions and writing each partition out from there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user junegunn commented on the issue: https://github.com/apache/spark/pull/16347 Thanks for the comment. I was trying to implement the following Hive QL in Spark SQL/API: ```sql set hive.exec.dynamic.partition.mode=nonstrict; set hive.mapred.mode = nonstrict; insert overwrite table target_table partition (day) select * from source_table distribute by day sort by id; ``` In Hive, `distribute by day` ensures that the records with the same "day" goes to the same reducer, and `sort by id` ensures that the input to each reducer is sorted by "id". It works as expected. The number of reducers is no more than the cardinality of "day" column, and I could confirm that the generated ORC file in each partition is sorted by "id". However, if I run the same query or its equivalent Spark code â [`repartition('day)` for `distribute by day`](https://github.com/apache/spark/blob/bfeccd80ef032cab3525037be3d3e42519619493/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2423), and [`sortWithinPartitions('id)` for `sort by id`](https://github.com/apache/spark/blob/bfeccd80ef032cab3525037be3d3e42519619493/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L990) â on Spark, we have the right number of writer tasks, one for each partition, and each task generates a single output file, but the generated ORC file is not properly sorted by "id" making ORC index ineffective. > Can your use case be satisfied by adding an explicit sortBy? `sortBy` is for bucketed tables and requires `bucketBy`, so I'm not sure if it's related to this issue regarding Hive compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/16347 Thanks for submitting the ticket. In general I don't think the sortWithinPartitions property can carry over to writing out data, because one partition actually corresponds to more than one file. Can your use case be satisfied by adding an explicit sortBy? ``` df.write.sortBy(col).parquet(...) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16347: [SPARK-18934][SQL] Writing to dynamic partitions does no...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16347 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org