cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304719952
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##########
 @@ -196,7 +201,105 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+    if ("true".equalsIgnoreCase(
+      
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) 
{
+      throw new IllegalStateException("Manual create table failure.")
+    }
+  }
+
+  def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): 
Unit = {
+    if (tableOptions.getBoolean(
+      TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+      throw new IllegalStateException("Manual write to table failure.")
+    }
+  }
+}
+
+class TestStagingInMemoryCatalog
+  extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+  override def stageCreate(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
false)
+  }
+
+  override def stageReplace(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
true)
+  }
+
+  private def newStagedTable(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String],
+      replaceIfExists: Boolean): StagedTable = {
+    import CatalogV2Implicits.IdentifierHelper
+    if (partitions.nonEmpty) {
+      throw new UnsupportedOperationException(
+        s"Catalog $name: Partitioned tables are not supported")
+    }
+
+    TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+    new TestStagedTable(
+      ident,
+      new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+      replaceIfExists)
+  }
+
+  private class TestStagedTable(
+      ident: Identifier,
+      delegateTable: InMemoryTable,
+      replaceIfExists: Boolean)
+    extends StagedTable with SupportsWrite with SupportsRead {
+
+    override def commitStagedChanges(): Unit = {
+      if (droppedTables.contains(ident)) {
 
 Review comment:
   Think about a REPLACE TABLE and DROP TABLE happen at the same time. It 
doesn't matter which one gets executed first, but the final result match be 
reachable by one certain execution order.
   
   If REPLACE TABLE executes first, then there should be no table at the end as 
it's dropped.
   If DROP TABLE executes first, then REPLACE TABLE should fail and there is 
still no table at the end.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to