[ https://issues.apache.org/jira/browse/SPARK-22504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249182#comment-16249182 ]
xuchuanyin commented on SPARK-22504: ------------------------------------ Does anyone has any suggestion? I'm going to work on this using renaming > Optimization in overwrite table in case of failure > -------------------------------------------------- > > Key: SPARK-22504 > URL: https://issues.apache.org/jira/browse/SPARK-22504 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.1.0 > Reporter: xuchuanyin > > Optimization in overwrite table in case of failure > # SCENARIO > Currently, `Overwrite` operation in spark is performed by following steps: > 1. DROP : drop old table > 2. WRITE: create and write data into new table > If some runtime error occurs in Step2, then the origin table will be lost > along with its data -- I think this will be a serious problem if someone > perform `read-update-flushback` actions. The problem can be reproduced by the > following code: > ```scala > 01: test("test spark df overwrite failed") { > 02: // prepare table > 03: val tableName = "test_spark_overwrite_failed" > 04: sql(s"DROP TABLE IF EXISTS $tableName") > 05: sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int, > field_string String)" + > 06: s" STORED AS parquet").collect() > 07: > 08: // load data first > 09: val schema = StructType( > 10: Seq(StructField("field_int", DataTypes.IntegerType, nullable = > false), > 11: StructField("field_string", DataTypes.StringType, nullable = > false))) > 12: val rdd1 = sqlContext.sparkContext.parallelize( > 13: Row(20, "q") :: > 14: Row(21, "qw") :: > 15: Row(23, "qwe") :: Nil) > 16: val dataFrame = sqlContext.createDataFrame(rdd1, schema) > 17: > dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName) > 18: sql(s"SELECT * FROM $tableName").show() > 19: > 20: // load data again, the following data will cause failure in data > loading > 21: try { > 22: val rdd2 = sqlContext.sparkContext.parallelize( > 23: Row(31, "qwer") :: > 24: Row(null, "qwer") :: > 25: Row(32, "long_than_5") :: Nil) > 26: val dataFrame2 = sqlContext.createDataFrame(rdd2, schema) > 27: > 28: > dataFrame2.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName) > 29: } catch { > 30: case e: Exception => LOGGER.error(e, "write overwrite failure") > 31: } > 32: // table `test_spark_overwrite_failed` has been dropped > 33: sql(s"show tables").show(20, truncate = false) > 34: // the content is empty even if table exists. We want it to be the > same as > 35: sql(s"SELECT * FROM $tableName").show() > 36: } > ``` > In Line24, we creata a `null` element while the schema is `notnull` -- This > will cause runtime error in loading data. > In Line33, table `test_spark_overwrite_failed` has already been dropped and > no longger exists in the current table. And of course Line35 will fail. > Instead, we want Line35 to show the origin data just as Line18. > # ANALYZE > I am thinking of optimizing `overwrite` in spark -- The goal is to keep the > old data until the load has finished successfully. The old data can only be > cleaned when the load is successful. > Since sparksql already support `rename` operation, we can optimize > `overwrite` in the following steps: > 1. WRITE: create and write data to tempTable > 2. SWAP: swap temptable1 with targetTable by using rename operation > 3. CLEAN: clean up old data > If step1 works fine, then swap tempTable with targetTable and clean up old > data; otherwise, keep the target table not changed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org