cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304719952
########## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala ########## @@ -196,7 +201,105 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { + if ("true".equalsIgnoreCase( + tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) { + throw new IllegalStateException("Manual create table failure.") + } + } + + def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = { + if (tableOptions.getBoolean( + TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) { + throw new IllegalStateException("Manual write to table failure.") + } + } +} + +class TestStagingInMemoryCatalog + extends TestInMemoryTableCatalog with StagingTableCatalog { + + override def stageCreate( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { + newStagedTable(ident, schema, partitions, properties, replaceIfExists = false) + } + + override def stageReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { + newStagedTable(ident, schema, partitions, properties, replaceIfExists = true) + } + + private def newStagedTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String], + replaceIfExists: Boolean): StagedTable = { + import CatalogV2Implicits.IdentifierHelper + if (partitions.nonEmpty) { + throw new UnsupportedOperationException( + s"Catalog $name: Partitioned tables are not supported") + } + + TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) + + new TestStagedTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties), + replaceIfExists) + } + + private class TestStagedTable( + ident: Identifier, + delegateTable: InMemoryTable, + replaceIfExists: Boolean) + extends StagedTable with SupportsWrite with SupportsRead { + + override def commitStagedChanges(): Unit = { + if (droppedTables.contains(ident)) { Review comment: Think about a REPLACE TABLE and DROP TABLE happen at the same time. It doesn't matter which one gets executed first, but the final result match be reachable by one certain execution order. If REPLACE TABLE executes first, then there should be no table at the end as it's dropped. If DROP TABLE executes first, then REPLACE TABLE should fail and there is still no table at the end. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org