[ 
https://issues.apache.org/jira/browse/SPARK-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15347172#comment-15347172
 ] 

Ryan Blue commented on SPARK-16032:
-----------------------------------

bq. I am not sure apply by-name resolution just to partition columns is a good 
idea.

I'm not sure about this. After looking into it more, I agree in principle and 
that in the long term we don't want to mix by-position column matching with 
by-name partitioning. But I'm less certain about whether or not it's a good 
idea right now. As I look at it more, I agree with you guys more about what is 
"right". But, I'm still concerned about how to move forward from where we're 
at, given the way people are currently using the API.

I think we've already established that it isn't clear that the DataFrameWriter 
API relies on position. I actually think that most people aren't thinking about 
the choice between by-position or by-name resolution and are using what they 
get working. My first use of the API was to build a partitioned table from an 
unpartitioned table, which failed. When I went looking for a solution, 
{{partitionBy}} was the obvious choice (suggested by my IDE) and, sure enough, 
it fixed the problem by [moving the partition columns by 
name|https://github.com/apache/spark/blob/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L180]
 to the end. This solution is common because it works and is more obvious than 
thinking about column order because, as I noted above, it isn't clear that 
{{insertInto}} is using position.

The pattern of using {{partitionBy}} with {{insertInto}} has also become a best 
practice for maintaining ETL jobs in Spark. Consider this table setup, where 
data lands in {{src}} in batches and we move it to {{dest}} for long-term 
storage in Parquet. Here's some example DDL:

{code:lang=sql}
CREATE TABLE src (id string, timestamp bigint, other_properties map<string, 
string>);
CREATE TABLE dest (id string, timestamp bigint, c1 string, c2 int)
  PARTITIONED BY (utc_dateint int, utc_hour int);
{code}

The Spark code for this ETL job should be this:

{code:lang=java}
spark.table("src")
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .withColumn("date", dateint($"timetamp"))
  .withColumn("hour", hour($"timestamp"))
  .dropColumn("other_properties")
  .write.insertInto("dest")
{code}

But, users are likely to try this next version instead. That's because it isn't 
obvious that partition columns go after data columns; they are two separate 
lists in the DDL.

{code:lang=java}
spark.table("src")
  .withColumn("date", dateint($"timetamp"))
  .withColumn("hour", hour($"timestamp"))
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .dropColumn("other_properties")
  .write.insertInto("dest")
{code}

And again, the most obvious fix is to add {{partitionBy}} to specify the 
partition columns, which appears to users as a match for Hive's 
{{PARTITION("date", "hour")}} syntax. Users then get the impression that 
{{partitionBy}} is equivalent to {{PARTITION}}, though in reality Hive operates 
by position.

{code:lang=java}
spark.table("src")
  .withColumn("date", dateint($"timetamp"))
  .withColumn("hour", hour($"timestamp"))
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .dropColumn("other_properties")
  .write.partitionBy("date", "hour").insertInto("dest")Another case 
{code}

Another reason to use {{partitionBy}} is for maintaining ETL over time. When 
structure changes, so does column order. Say I want to add a dedup step so I 
get just one row per ID per day. My first attempt, based on getting the column 
order right to begin with, looks like this:

{code:lang=java}
// column orders change, causing the query to break
spark.table("src")
  .withColumn("date", dateint($"timetamp")) // moved to before dropDuplicates
  .dropDuplicates($"date", $"id")           // added to dedup records
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .withColumn("hour", hour($"timestamp"))
  .dropColumn("other_properties")
  .write.insertInto("dest")
{code}

The result is that I get crazy partitioning because c2 and hour are used. The 
most obvious symptom of the wrong column order is partitioning and when I look 
into it, I find {{partitionBy}} fixes it. In many cases, that's the first 
method I'll try because I see bad partition values. This solution doesn't 
always fix the query, but it does solve the partitioning problem I observed. 
(Also: other order problems are hidden by inserting {{Cast}} instead of the 
safer {{UpCast}}.)

Users will also _choose_ this over the right solution, which is to add 
{{select}}:

{code:lang=java}
// column orders change
spark.table("src")
  .withColumn("date", dateint($"timetamp")) // moved to before dropDuplicates
  .dropDuplicates($"date", $"id")           // added to dedup records
  .withColumn("c1", $"other_properties".getItem("c1"))
  .withColumn("c2", $"other_properties".getItem("c2").cast(IntegerType))
  .withColumn("hour", hour($"timestamp"))
  .select("id", "c1", "c2", "date", "hour") // reorder for the insert
  .write.insertInto("dest")
{code}

The reason why {{partitionBy}} is more attractive is that {{select}} requires 
me to list all of the columns I want, in the right order. When tables are 20+ 
columns, which is not unusual, that's annoying. And to come up with this fix, a 
user needs to know that order matters.

While I agree that we shouldn't be mixing semantics, removing support for 
{{partitionBy}} with {{insertInto}} will break the jobs that do. So, what is 
the most reasonable way to fix this?

The options are:
1. Throw an exception if {{insertInto}} (by-position) is used with 
{{partitionBy}} (by-name), document the right way to use {{insertInto}}
2. Change {{insertInto}} to use by-name resolution to match {{partitionBy}}. (I 
think this would break fewer jobs moving to 2.0)
3. Let {{insertInto}} and {{partitionBy}} continue working as they do right now 
and fix it later.
4. Remove {{insertInto}} (or rename?) because the rest of {{DataFrameWriter}} 
resolves by name and users expect it?
5. ???

I don't have the answer, but I know that option 1 isn't ideal. If jobs are 
going to break moving to 2.0, I'd like to know which ones before runtime.

_Note:_ My patch to detect table partitioning introduced a 
[bug|https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L174-L180]
 that's still present and requires the original {{partitionBy}} column names to 
match the final table's column names. That should be taken out because the 
columns may be renamed in the pre-insert check. It hasn't been released; I'll 
open a PR for this.

> Audit semantics of various insertion operations related to partitioned tables
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-16032
>                 URL: https://issues.apache.org/jira/browse/SPARK-16032
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Cheng Lian
>            Assignee: Wenchen Fan
>            Priority: Critical
>         Attachments: [SPARK-16032] Spark SQL table insertion auditing - 
> Google Docs.pdf
>
>
> We found that semantics of various insertion operations related to partition 
> tables can be inconsistent. This is an umbrella ticket for all related 
> tickets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to