[ 
https://issues.apache.org/jira/browse/SPARK-22504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16250621#comment-16250621
 ] 

xuchuanyin commented on SPARK-22504:
------------------------------------

[~srowen] thanks for your reply. My opinion is as below:

how do you ensure the new table is cleaned up in case of failure?
A: We can just use `drop` on the temp tables in case of failure. As for 'how to 
make sure', I would have to say that it depends on the realization of `drop`.

how do you make sure the old table is deleted?
A: The answer is the same as above.

what about the implications of having two of the tables' storage at once?
A: I think there is no other impacts other than have two copy of data for a 
while and some metadata operation.

I think the current semantics are correct and as expected in case of a failure.
A: I still can't aggree with you with this opinion.
`insert overwrite` differs from `insert` that it will truncate the target data, 
in another word, it will replace the old data with the newer one. As for 
`replace`, the old data can and only can be drop if the replace succeed, if the 
replace failed, the old data should remain unchanged -- It tries to keep the 
operation (weak) atomic.
Besides, in the testcase I provided in the issue description, the failure of 
overwrite will cause the origin data missing -- I don't think that's the user 
wanted.

SparkSQL currently does not provide `update` operation on a table. User who 
wants to update the existed table can only read from it and overwrite it at 
last. If SparkSQL won't support the above semantic, then it is the user's 
responsibility to keep the operation atomic.


> Optimization in overwrite table in case of failure
> --------------------------------------------------
>
>                 Key: SPARK-22504
>                 URL: https://issues.apache.org/jira/browse/SPARK-22504
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: xuchuanyin
>
> Optimization in overwrite table in case of failure
> # SCENARIO
> Currently, `Overwrite` operation in spark is performed by following steps: 
> 1. DROP : drop old table
> 2. WRITE: create and write data into new table
> If some runtime error occurs in Step2, then the origin table will be lost 
> along with its data -- I think this will be a serious problem if someone 
> perform `read-update-flushback` actions. The problem can be reproduced by the 
> following code:
> ```scala
> 01: test("test spark df overwrite failed") {
> 02:     // prepare table
> 03:     val tableName = "test_spark_overwrite_failed"
> 04:     sql(s"DROP TABLE IF EXISTS $tableName")
> 05:     sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int, 
> field_string String)" +
> 06:         s" STORED AS parquet").collect()
> 07: 
> 08:     // load data first
> 09:     val schema = StructType(
> 10:       Seq(StructField("field_int", DataTypes.IntegerType, nullable = 
> false),
> 11:         StructField("field_string", DataTypes.StringType, nullable = 
> false)))
> 12:     val rdd1 = sqlContext.sparkContext.parallelize(
> 13:       Row(20, "q") ::
> 14:       Row(21, "qw") ::
> 15:       Row(23, "qwe") :: Nil)
> 16:     val dataFrame = sqlContext.createDataFrame(rdd1, schema)
> 17:     
> dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
> 18:     sql(s"SELECT * FROM $tableName").show()
> 19: 
> 20:     // load data again, the following data will cause failure in data 
> loading
> 21:     try {
> 22:       val rdd2 = sqlContext.sparkContext.parallelize(
> 23:         Row(31, "qwer") ::
> 24:         Row(null, "qwer") ::
> 25:         Row(32, "long_than_5") :: Nil)
> 26:       val dataFrame2 = sqlContext.createDataFrame(rdd2, schema)
> 27: 
> 28:       
> dataFrame2.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
> 29:     } catch {
> 30:       case e: Exception => LOGGER.error(e, "write overwrite failure")
> 31:     }
> 32:     // table `test_spark_overwrite_failed` has been dropped
> 33:     sql(s"show tables").show(20, truncate = false)
> 34:     // the content is empty even if table exists. We want it to be the 
> same as 
> 35:     sql(s"SELECT * FROM $tableName").show()
> 36:   }
> ```
> In Line24, we creata a `null` element while the schema is `notnull` -- This 
> will cause runtime error in loading data.
> In Line33, table `test_spark_overwrite_failed` has already been dropped and 
> no longger exists in the current table. And of course Line35 will fail.
> Instead, we want Line35 to show the origin data just as Line18.
> # ANALYZE
> I am thinking of optimizing `overwrite` in spark -- The goal is to keep the 
> old data until the load has finished successfully. The old data can only be 
> cleaned when the load is successful.
> Since sparksql already support `rename` operation, we can optimize 
> `overwrite` in the following steps:
> 1. WRITE: create and write data to tempTable
> 2. SWAP: swap temptable1 with targetTable by using rename operation
> 3. CLEAN: clean up old data
> If step1 works fine, then swap tempTable with targetTable and clean up old 
> data; otherwise, keep the target table not changed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to