[ https://issues.apache.org/jira/browse/SPARK-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15347172#comment-15347172 ]
Ryan Blue commented on SPARK-16032: ----------------------------------- bq. I am not sure apply by-name resolution just to partition columns is a good idea. I'm not sure about this. After looking into it more, I agree in principle and that in the long term we don't want to mix by-position column matching with by-name partitioning. But I'm less certain about whether or not it's a good idea right now. As I look at it more, I agree with you guys more about what is "right". But, I'm still concerned about how to move forward from where we're at, given the way people are currently using the API. I think we've already established that it isn't clear that the DataFrameWriter API relies on position. I actually think that most people aren't thinking about the choice between by-position or by-name resolution and are using what they get working. My first use of the API was to build a partitioned table from an unpartitioned table, which failed. When I went looking for a solution, {{partitionBy}} was the obvious choice (suggested by my IDE) and, sure enough, it fixed the problem by [moving the partition columns by name|https://github.com/apache/spark/blob/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L180] to the end. This solution is common because it works and is more obvious than thinking about column order because, as I noted above, it isn't clear that {{insertInto}} is using position. The pattern of using {{partitionBy}} with {{insertInto}} has also become a best practice for maintaining ETL jobs in Spark. Consider this table setup, where data lands in {{src}} in batches and we move it to {{dest}} for long-term storage in Parquet. Here's some example DDL: {code:lang=sql} CREATE TABLE src (id string, timestamp bigint, other_properties map<string, string>); CREATE TABLE dest (id string, timestamp bigint, c1 string, c2 int) PARTITIONED BY (utc_dateint int, utc_hour int); {code} The Spark code for this ETL job should be this: {code:lang=java} spark.table("src") .withColumn("c1", $"other_properties".getItem("c1")) .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType)) .withColumn("date", dateint($"timetamp")) .withColumn("hour", hour($"timestamp")) .dropColumn("other_properties") .write.insertInto("dest") {code} But, users are likely to try this next version instead. That's because it isn't obvious that partition columns go after data columns; they are two separate lists in the DDL. {code:lang=java} spark.table("src") .withColumn("date", dateint($"timetamp")) .withColumn("hour", hour($"timestamp")) .withColumn("c1", $"other_properties".getItem("c1")) .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType)) .dropColumn("other_properties") .write.insertInto("dest") {code} And again, the most obvious fix is to add {{partitionBy}} to specify the partition columns, which appears to users as a match for Hive's {{PARTITION("date", "hour")}} syntax. Users then get the impression that {{partitionBy}} is equivalent to {{PARTITION}}, though in reality Hive operates by position. {code:lang=java} spark.table("src") .withColumn("date", dateint($"timetamp")) .withColumn("hour", hour($"timestamp")) .withColumn("c1", $"other_properties".getItem("c1")) .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType)) .dropColumn("other_properties") .write.partitionBy("date", "hour").insertInto("dest")Another case {code} Another reason to use {{partitionBy}} is for maintaining ETL over time. When structure changes, so does column order. Say I want to add a dedup step so I get just one row per ID per day. My first attempt, based on getting the column order right to begin with, looks like this: {code:lang=java} // column orders change, causing the query to break spark.table("src") .withColumn("date", dateint($"timetamp")) // moved to before dropDuplicates .dropDuplicates($"date", $"id") // added to dedup records .withColumn("c1", $"other_properties".getItem("c1")) .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType)) .withColumn("hour", hour($"timestamp")) .dropColumn("other_properties") .write.insertInto("dest") {code} The result is that I get crazy partitioning because c2 and hour are used. The most obvious symptom of the wrong column order is partitioning and when I look into it, I find {{partitionBy}} fixes it. In many cases, that's the first method I'll try because I see bad partition values. This solution doesn't always fix the query, but it does solve the partitioning problem I observed. (Also: other order problems are hidden by inserting {{Cast}} instead of the safer {{UpCast}}.) Users will also _choose_ this over the right solution, which is to add {{select}}: {code:lang=java} // column orders change spark.table("src") .withColumn("date", dateint($"timetamp")) // moved to before dropDuplicates .dropDuplicates($"date", $"id") // added to dedup records .withColumn("c1", $"other_properties".getItem("c1")) .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType)) .withColumn("hour", hour($"timestamp")) .select("id", "c1", "c2", "date", "hour") // reorder for the insert .write.insertInto("dest") {code} The reason why {{partitionBy}} is more attractive is that {{select}} requires me to list all of the columns I want, in the right order. When tables are 20+ columns, which is not unusual, that's annoying. And to come up with this fix, a user needs to know that order matters. While I agree that we shouldn't be mixing semantics, removing support for {{partitionBy}} with {{insertInto}} will break the jobs that do. So, what is the most reasonable way to fix this? The options are: 1. Throw an exception if {{insertInto}} (by-position) is used with {{partitionBy}} (by-name), document the right way to use {{insertInto}} 2. Change {{insertInto}} to use by-name resolution to match {{partitionBy}}. (I think this would break fewer jobs moving to 2.0) 3. Let {{insertInto}} and {{partitionBy}} continue working as they do right now and fix it later. 4. Remove {{insertInto}} (or rename?) because the rest of {{DataFrameWriter}} resolves by name and users expect it? 5. ??? I don't have the answer, but I know that option 1 isn't ideal. If jobs are going to break moving to 2.0, I'd like to know which ones before runtime. _Note:_ My patch to detect table partitioning introduced a [bug|https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L174-L180] that's still present and requires the original {{partitionBy}} column names to match the final table's column names. That should be taken out because the columns may be renamed in the pre-insert check. It hasn't been released; I'll open a PR for this. > Audit semantics of various insertion operations related to partitioned tables > ----------------------------------------------------------------------------- > > Key: SPARK-16032 > URL: https://issues.apache.org/jira/browse/SPARK-16032 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0 > Reporter: Cheng Lian > Assignee: Wenchen Fan > Priority: Critical > Attachments: [SPARK-16032] Spark SQL table insertion auditing - > Google Docs.pdf > > > We found that semantics of various insertion operations related to partition > tables can be inconsistent. This is an umbrella ticket for all related > tickets. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org