cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304708250
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##########
 @@ -196,7 +201,105 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+    if ("true".equalsIgnoreCase(
+      
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) 
{
+      throw new IllegalStateException("Manual create table failure.")
+    }
+  }
+
+  def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): 
Unit = {
+    if (tableOptions.getBoolean(
+      TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+      throw new IllegalStateException("Manual write to table failure.")
+    }
+  }
+}
+
+class TestStagingInMemoryCatalog
+  extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+  override def stageCreate(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
false)
+  }
+
+  override def stageReplace(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
true)
+  }
+
+  private def newStagedTable(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String],
+      replaceIfExists: Boolean): StagedTable = {
+    import CatalogV2Implicits.IdentifierHelper
+    if (partitions.nonEmpty) {
+      throw new UnsupportedOperationException(
+        s"Catalog $name: Partitioned tables are not supported")
+    }
+
+    TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+    new TestStagedTable(
+      ident,
+      new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+      replaceIfExists)
+  }
+
+  private class TestStagedTable(
+      ident: Identifier,
+      delegateTable: InMemoryTable,
+      replaceIfExists: Boolean)
+    extends StagedTable with SupportsWrite with SupportsRead {
+
+    override def commitStagedChanges(): Unit = {
+      if (droppedTables.contains(ident)) {
 
 Review comment:
   it's weird to record all the dropped tables in the history. I think a simple 
version is
   ```
   if (replaceIfExists) {
     if (!tables.containsKey(ident)) {
         throw new RuntimeException("table already dropped")
     }
     tables.put(ident, delegateTable)
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to