[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r305604944 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala ## @@ -89,15 +92,145 @@ case class CreateTableAsSelectExec( case _ => // table does not support writes - throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}") + throw new SparkException( +s"Table implementation does not support writes: ${ident.quoted}") } +})(catchBlock = { + catalog.dropTable(ident) +}) + } +} + +/** + * Physical plan node for v2 create table as select, when the catalog is determined to support + * staging table creation. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * The CTAS operation is atomic. The creation of the table is staged and the commit of the write + * should bundle the commitment of the metadata and the table contents in a single unit. If the + * write fails, the table is instructed to roll back all staged changes. + */ +case class AtomicCreateTableAsSelectExec( +catalog: StagingTableCatalog, +ident: Identifier, +partitioning: Seq[Transform], +query: SparkPlan, +properties: Map[String, String], +writeOptions: CaseInsensitiveStringMap, +ifNotExists: Boolean) extends AtomicTableWriteExec { + + override protected def doExecute(): RDD[InternalRow] = { +if (catalog.tableExists(ident)) { + if (ifNotExists) { +return sparkContext.parallelize(Seq.empty, 1) + } + + throw new TableAlreadyExistsException(ident) +} +val stagedTable = catalog.stageCreate( + ident, query.schema, partitioning.toArray, properties.asJava) +writeToStagedTable(stagedTable, writeOptions, ident) + } +} + +/** + * Physical plan node for v2 replace table as select when the catalog does not support staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This is a non-atomic implementation that drops the table and then runs non-atomic + * CTAS. For an atomic implementation for catalogs with the appropriate support, see + * ReplaceTableAsSelectStagingExec. + */ +case class ReplaceTableAsSelectExec( +catalog: TableCatalog, +ident: Identifier, +partitioning: Seq[Transform], +query: SparkPlan, +properties: Map[String, String], +writeOptions: CaseInsensitiveStringMap, +orCreate: Boolean) extends AtomicTableWriteExec { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper + + override protected def doExecute(): RDD[InternalRow] = { +// Note that this operation is potentially unsafe, but these are the strict semantics of +// RTAS if the catalog does not support atomic operations. +// +// There are numerous cases we concede to where the table will be dropped and irrecoverable: +// +// 1. Creating the new table fails, +// 2. Writing to the new table fails, +// 3. The table returned by catalog.createTable doesn't support writing. +if (catalog.tableExists(ident)) { + catalog.dropTable(ident) +} else if (!orCreate) { + throw new CannotReplaceMissingTableException(ident) +} +val createdTable = catalog.createTable( + ident, query.schema, partitioning.toArray, properties.asJava) +Utils.tryWithSafeFinallyAndFailureCallbacks({ + createdTable match { +case table: SupportsWrite => + val batchWrite = table.newWriteBuilder(writeOptions) +.withInputDataSchema(query.schema) +.withQueryId(UUID.randomUUID().toString) +.buildForBatch() + + doWrite(batchWrite) +case _ => + // table does not support writes + throw new SparkException( +s"Table implementation does not support writes: ${ident.quoted}") + } })(catchBlock = { catalog.dropTable(ident) }) } } +/** + * + * Physical plan node for v2 replace table as select when the catalog supports staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This implementation is atomic. The table replacement is staged, and the commit + * operation at the end should perform tne replacement of the table's metadata and contents. If the + * write fails, the table is instructed to roll back staged changes and any previously written table + * is left untouched. + */ +case
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r305214108 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala ## @@ -89,15 +92,145 @@ case class CreateTableAsSelectExec( case _ => // table does not support writes - throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}") + throw new SparkException( +s"Table implementation does not support writes: ${ident.quoted}") } +})(catchBlock = { + catalog.dropTable(ident) +}) + } +} + +/** + * Physical plan node for v2 create table as select, when the catalog is determined to support + * staging table creation. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * The CTAS operation is atomic. The creation of the table is staged and the commit of the write + * should bundle the commitment of the metadata and the table contents in a single unit. If the + * write fails, the table is instructed to roll back all staged changes. + */ +case class AtomicCreateTableAsSelectExec( +catalog: StagingTableCatalog, +ident: Identifier, +partitioning: Seq[Transform], +query: SparkPlan, +properties: Map[String, String], +writeOptions: CaseInsensitiveStringMap, +ifNotExists: Boolean) extends AtomicTableWriteExec { + + override protected def doExecute(): RDD[InternalRow] = { +if (catalog.tableExists(ident)) { + if (ifNotExists) { +return sparkContext.parallelize(Seq.empty, 1) + } + + throw new TableAlreadyExistsException(ident) +} +val stagedTable = catalog.stageCreate( + ident, query.schema, partitioning.toArray, properties.asJava) +writeToStagedTable(stagedTable, writeOptions, ident) + } +} + +/** + * Physical plan node for v2 replace table as select when the catalog does not support staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This is a non-atomic implementation that drops the table and then runs non-atomic + * CTAS. For an atomic implementation for catalogs with the appropriate support, see + * ReplaceTableAsSelectStagingExec. + */ +case class ReplaceTableAsSelectExec( +catalog: TableCatalog, +ident: Identifier, +partitioning: Seq[Transform], +query: SparkPlan, +properties: Map[String, String], +writeOptions: CaseInsensitiveStringMap, +orCreate: Boolean) extends AtomicTableWriteExec { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper + + override protected def doExecute(): RDD[InternalRow] = { +// Note that this operation is potentially unsafe, but these are the strict semantics of +// RTAS if the catalog does not support atomic operations. +// +// There are numerous cases we concede to where the table will be dropped and irrecoverable: +// +// 1. Creating the new table fails, +// 2. Writing to the new table fails, +// 3. The table returned by catalog.createTable doesn't support writing. +if (catalog.tableExists(ident)) { + catalog.dropTable(ident) +} else if (!orCreate) { + throw new CannotReplaceMissingTableException(ident) +} +val createdTable = catalog.createTable( + ident, query.schema, partitioning.toArray, properties.asJava) +Utils.tryWithSafeFinallyAndFailureCallbacks({ + createdTable match { +case table: SupportsWrite => + val batchWrite = table.newWriteBuilder(writeOptions) +.withInputDataSchema(query.schema) +.withQueryId(UUID.randomUUID().toString) +.buildForBatch() + + doWrite(batchWrite) +case _ => + // table does not support writes + throw new SparkException( +s"Table implementation does not support writes: ${ident.quoted}") + } })(catchBlock = { catalog.dropTable(ident) }) } } +/** + * + * Physical plan node for v2 replace table as select when the catalog supports staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This implementation is atomic. The table replacement is staged, and the commit + * operation at the end should perform tne replacement of the table's metadata and contents. If the + * write fails, the table is instructed to roll back staged changes and any previously written table + * is left untouched. + */ +case
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r305214108 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala ## @@ -89,15 +92,145 @@ case class CreateTableAsSelectExec( case _ => // table does not support writes - throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}") + throw new SparkException( +s"Table implementation does not support writes: ${ident.quoted}") } +})(catchBlock = { + catalog.dropTable(ident) +}) + } +} + +/** + * Physical plan node for v2 create table as select, when the catalog is determined to support + * staging table creation. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * The CTAS operation is atomic. The creation of the table is staged and the commit of the write + * should bundle the commitment of the metadata and the table contents in a single unit. If the + * write fails, the table is instructed to roll back all staged changes. + */ +case class AtomicCreateTableAsSelectExec( +catalog: StagingTableCatalog, +ident: Identifier, +partitioning: Seq[Transform], +query: SparkPlan, +properties: Map[String, String], +writeOptions: CaseInsensitiveStringMap, +ifNotExists: Boolean) extends AtomicTableWriteExec { + + override protected def doExecute(): RDD[InternalRow] = { +if (catalog.tableExists(ident)) { + if (ifNotExists) { +return sparkContext.parallelize(Seq.empty, 1) + } + + throw new TableAlreadyExistsException(ident) +} +val stagedTable = catalog.stageCreate( + ident, query.schema, partitioning.toArray, properties.asJava) +writeToStagedTable(stagedTable, writeOptions, ident) + } +} + +/** + * Physical plan node for v2 replace table as select when the catalog does not support staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This is a non-atomic implementation that drops the table and then runs non-atomic + * CTAS. For an atomic implementation for catalogs with the appropriate support, see + * ReplaceTableAsSelectStagingExec. + */ +case class ReplaceTableAsSelectExec( +catalog: TableCatalog, +ident: Identifier, +partitioning: Seq[Transform], +query: SparkPlan, +properties: Map[String, String], +writeOptions: CaseInsensitiveStringMap, +orCreate: Boolean) extends AtomicTableWriteExec { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper + + override protected def doExecute(): RDD[InternalRow] = { +// Note that this operation is potentially unsafe, but these are the strict semantics of +// RTAS if the catalog does not support atomic operations. +// +// There are numerous cases we concede to where the table will be dropped and irrecoverable: +// +// 1. Creating the new table fails, +// 2. Writing to the new table fails, +// 3. The table returned by catalog.createTable doesn't support writing. +if (catalog.tableExists(ident)) { + catalog.dropTable(ident) +} else if (!orCreate) { + throw new CannotReplaceMissingTableException(ident) +} +val createdTable = catalog.createTable( + ident, query.schema, partitioning.toArray, properties.asJava) +Utils.tryWithSafeFinallyAndFailureCallbacks({ + createdTable match { +case table: SupportsWrite => + val batchWrite = table.newWriteBuilder(writeOptions) +.withInputDataSchema(query.schema) +.withQueryId(UUID.randomUUID().toString) +.buildForBatch() + + doWrite(batchWrite) +case _ => + // table does not support writes + throw new SparkException( +s"Table implementation does not support writes: ${ident.quoted}") + } })(catchBlock = { catalog.dropTable(ident) }) } } +/** + * + * Physical plan node for v2 replace table as select when the catalog supports staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This implementation is atomic. The table replacement is staged, and the commit + * operation at the end should perform tne replacement of the table's metadata and contents. If the + * write fails, the table is instructed to roll back staged changes and any previously written table + * is left untouched. + */ +case
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r305213786 ## File path: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.StagedTable; +import org.apache.spark.sql.sources.v2.SupportsWrite; +import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of + * the a table before committing the table's metadata along with its contents in CREATE TABLE AS + * SELECT or REPLACE TABLE AS SELECT operations. + * + * It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS + * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE + * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first + * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via + * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform + * the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the + * write operation fails, the catalog will have already dropped the table, and the planner cannot + * roll back the dropping of the table. + * + * If the catalog implements this plugin, the catalog can implement the methods to "stage" the + * creation and the replacement of a table. After the table's + * {@link BatchWrite#commit(WriterCommitMessage[])} is called, + * {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can + * complete both the data write and the metadata swap operation atomically. + */ +public interface StagingTableCatalog extends TableCatalog { + + /** + * Stage the creation of a table, preparing it to be committed into the metastore. + * + * When the table is committed, the contents of any writes performed by the Spark planner are + * committed along with the metadata about the table passed into this method's arguments. If the + * table exists when this method is called, the method should throw an exception accordingly. If + * another process concurrently creates the table before this table's staged changes are + * committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions transforms to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table or view already exists for the identifier + * @throws UnsupportedOperationException If a requested partition transform is not supported + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + StagedTable stageCreate( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + + /** + * Stage the replacement of a table, preparing it to be committed into the metastore when the + * returned table's {@link StagedTable#commitStagedChanges()} is called. + * + * When the table is committed, the contents of any writes performed by the Spark planner are + * committed along with the metadata about the table passed into this method's arguments. I
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r305213047 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ## @@ -2294,6 +2303,69 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Replace a table, returning a [[ReplaceTableStatement]] logical plan. + * + * Expected format: + * {{{ + * REPLACE TABLE [IF NOT EXISTS] [db_name.]table_name Review comment: this doesn't match the actual syntax now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304725602 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala ## @@ -196,7 +201,105 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { +if ("true".equalsIgnoreCase( + tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) { + throw new IllegalStateException("Manual create table failure.") +} + } + + def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = { +if (tableOptions.getBoolean( + TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) { + throw new IllegalStateException("Manual write to table failure.") +} + } +} + +class TestStagingInMemoryCatalog + extends TestInMemoryTableCatalog with StagingTableCatalog { + + override def stageCreate( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = false) + } + + override def stageReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = true) + } + + private def newStagedTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String], + replaceIfExists: Boolean): StagedTable = { +import CatalogV2Implicits.IdentifierHelper +if (partitions.nonEmpty) { + throw new UnsupportedOperationException( +s"Catalog $name: Partitioned tables are not supported") +} + +TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) + +new TestStagedTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties), + replaceIfExists) + } + + private class TestStagedTable( + ident: Identifier, + delegateTable: InMemoryTable, + replaceIfExists: Boolean) +extends StagedTable with SupportsWrite with SupportsRead { + +override def commitStagedChanges(): Unit = { + if (droppedTables.contains(ident)) { Review comment: BTW I think my proposal works for REPLACE TABLE right? `stageCreate` is for CTAS and I think your current code(without tracking dropped tables) already works This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304725602 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala ## @@ -196,7 +201,105 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { +if ("true".equalsIgnoreCase( + tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) { + throw new IllegalStateException("Manual create table failure.") +} + } + + def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = { +if (tableOptions.getBoolean( + TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) { + throw new IllegalStateException("Manual write to table failure.") +} + } +} + +class TestStagingInMemoryCatalog + extends TestInMemoryTableCatalog with StagingTableCatalog { + + override def stageCreate( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = false) + } + + override def stageReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = true) + } + + private def newStagedTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String], + replaceIfExists: Boolean): StagedTable = { +import CatalogV2Implicits.IdentifierHelper +if (partitions.nonEmpty) { + throw new UnsupportedOperationException( +s"Catalog $name: Partitioned tables are not supported") +} + +TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) + +new TestStagedTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties), + replaceIfExists) + } + + private class TestStagedTable( + ident: Identifier, + delegateTable: InMemoryTable, + replaceIfExists: Boolean) +extends StagedTable with SupportsWrite with SupportsRead { + +override def commitStagedChanges(): Unit = { + if (droppedTables.contains(ident)) { Review comment: BTW I think my proposal works for REPLACE TABLE right? `stageCreate` is for CTAS and I think your current code already works This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304719952 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala ## @@ -196,7 +201,105 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { +if ("true".equalsIgnoreCase( + tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) { + throw new IllegalStateException("Manual create table failure.") +} + } + + def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = { +if (tableOptions.getBoolean( + TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) { + throw new IllegalStateException("Manual write to table failure.") +} + } +} + +class TestStagingInMemoryCatalog + extends TestInMemoryTableCatalog with StagingTableCatalog { + + override def stageCreate( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = false) + } + + override def stageReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = true) + } + + private def newStagedTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String], + replaceIfExists: Boolean): StagedTable = { +import CatalogV2Implicits.IdentifierHelper +if (partitions.nonEmpty) { + throw new UnsupportedOperationException( +s"Catalog $name: Partitioned tables are not supported") +} + +TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) + +new TestStagedTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties), + replaceIfExists) + } + + private class TestStagedTable( + ident: Identifier, + delegateTable: InMemoryTable, + replaceIfExists: Boolean) +extends StagedTable with SupportsWrite with SupportsRead { + +override def commitStagedChanges(): Unit = { + if (droppedTables.contains(ident)) { Review comment: Think about a REPLACE TABLE and DROP TABLE happen at the same time. It doesn't matter which one gets executed first, but the final result must be reachable by one certain execution order. If REPLACE TABLE executes first, then there should be no table at the end as it's dropped. If DROP TABLE executes first, then REPLACE TABLE should fail and there is still no table at the end. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304719952 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala ## @@ -196,7 +201,105 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { +if ("true".equalsIgnoreCase( + tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) { + throw new IllegalStateException("Manual create table failure.") +} + } + + def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = { +if (tableOptions.getBoolean( + TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) { + throw new IllegalStateException("Manual write to table failure.") +} + } +} + +class TestStagingInMemoryCatalog + extends TestInMemoryTableCatalog with StagingTableCatalog { + + override def stageCreate( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = false) + } + + override def stageReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = true) + } + + private def newStagedTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String], + replaceIfExists: Boolean): StagedTable = { +import CatalogV2Implicits.IdentifierHelper +if (partitions.nonEmpty) { + throw new UnsupportedOperationException( +s"Catalog $name: Partitioned tables are not supported") +} + +TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) + +new TestStagedTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties), + replaceIfExists) + } + + private class TestStagedTable( + ident: Identifier, + delegateTable: InMemoryTable, + replaceIfExists: Boolean) +extends StagedTable with SupportsWrite with SupportsRead { + +override def commitStagedChanges(): Unit = { + if (droppedTables.contains(ident)) { Review comment: Think about a REPLACE TABLE and DROP TABLE happen at the same time. It doesn't matter which one gets executed first, but the final result match be reachable by one certain execution order. If REPLACE TABLE executes first, then there should be no table at the end as it's dropped. If DROP TABLE executes first, then REPLACE TABLE should fail and there is still no table at the end. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304708250 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala ## @@ -196,7 +201,105 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { +if ("true".equalsIgnoreCase( + tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) { + throw new IllegalStateException("Manual create table failure.") +} + } + + def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = { +if (tableOptions.getBoolean( + TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) { + throw new IllegalStateException("Manual write to table failure.") +} + } +} + +class TestStagingInMemoryCatalog + extends TestInMemoryTableCatalog with StagingTableCatalog { + + override def stageCreate( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = false) + } + + override def stageReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = true) + } + + private def newStagedTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String], + replaceIfExists: Boolean): StagedTable = { +import CatalogV2Implicits.IdentifierHelper +if (partitions.nonEmpty) { + throw new UnsupportedOperationException( +s"Catalog $name: Partitioned tables are not supported") +} + +TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) + +new TestStagedTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties), + replaceIfExists) + } + + private class TestStagedTable( + ident: Identifier, + delegateTable: InMemoryTable, + replaceIfExists: Boolean) +extends StagedTable with SupportsWrite with SupportsRead { + +override def commitStagedChanges(): Unit = { + if (droppedTables.contains(ident)) { Review comment: it's weird to record all the dropped tables in the history. I think a simple version is ``` if (replaceIfExists) { if (!tables.containsKey(ident)) { throw new RuntimeException("table already dropped") } tables.put(ident, delegateTable) } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304390373 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala ## @@ -196,7 +196,103 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { +if (tableProperties.containsKey(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY) + && tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY) + .equalsIgnoreCase("true")) { + throw new IllegalStateException("Manual create table failure.") +} + } + + def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = { +val shouldSimulateFailedWrite = tableOptions + .getBoolean(TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false) +if (shouldSimulateFailedWrite) { + throw new IllegalStateException("Manual write to table failure.") +} + } +} + +class TestStagingInMemoryCatalog + extends TestInMemoryTableCatalog with StagingTableCatalog { + + override def stageCreate( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = false) + } + + override def stageReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { +newStagedTable(ident, schema, partitions, properties, replaceIfExists = true) + } + + private def newStagedTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String], + replaceIfExists: Boolean): StagedTable = { +import CatalogV2Implicits.IdentifierHelper +if (partitions.nonEmpty) { + throw new UnsupportedOperationException( +s"Catalog $name: Partitioned tables are not supported") +} + +TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) + +new TestStagedTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties), + replaceIfExists) + } + + private class TestStagedTable( + ident: Identifier, + delegateTable: InMemoryTable, + replaceIfExists: Boolean) +extends StagedTable with SupportsWrite with SupportsRead { + +override def commitStagedChanges(): Unit = { + if (replaceIfExists) { +tables.put(ident, delegateTable) Review comment: nit: when committing REPLACE TABLE, we should fail if the table is already dropped by others. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304389548 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala ## @@ -196,7 +196,103 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { +if (tableProperties.containsKey(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY) + && tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY) + .equalsIgnoreCase("true")) { Review comment: nit: we can just write `"true".equalsIgnoreCase(tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))` if the key doesn't exist, `"true".equalsIgnoreCase(null)` returns false. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304388426 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala ## @@ -89,15 +92,140 @@ case class CreateTableAsSelectExec( case _ => // table does not support writes - throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}") + throw new SparkException( +s"Table implementation does not support writes: ${ident.quoted}") } +})(catchBlock = { + catalog.dropTable(ident) +}) + } +} + +/** + * Physical plan node for v2 create table as select, when the catalog is determined to support + * staging table creation. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * The CTAS operation is atomic. The creation of the table is staged and the commit of the write + * should bundle the commitment of the metadata and the table contents in a single unit. If the + * write fails, the table is instructed to roll back all staged changes. + */ +case class AtomicCreateTableAsSelectExec( +catalog: StagingTableCatalog, +ident: Identifier, +partitioning: Seq[Transform], +query: SparkPlan, +properties: Map[String, String], +writeOptions: CaseInsensitiveStringMap, +ifNotExists: Boolean) extends AtomicTableWriteExec { + + override protected def doExecute(): RDD[InternalRow] = { +if (catalog.tableExists(ident)) { + if (ifNotExists) { +return sparkContext.parallelize(Seq.empty, 1) + } + + throw new TableAlreadyExistsException(ident) +} +val stagedTable = catalog.stageCreate( + ident, query.schema, partitioning.toArray, properties.asJava) +writeToStagedTable(stagedTable, writeOptions, ident) + } +} + +/** + * Physical plan node for v2 replace table as select when the catalog does not support staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This is a non-atomic implementation that drops the table and then runs non-atomic Review comment: IMO the non-atomic version is allowed to have undefined behavior when failure happens middle way. But it should work as the atomic version if no failure happens. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304387491 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala ## @@ -89,15 +92,140 @@ case class CreateTableAsSelectExec( case _ => // table does not support writes - throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}") + throw new SparkException( +s"Table implementation does not support writes: ${ident.quoted}") } +})(catchBlock = { + catalog.dropTable(ident) +}) + } +} + +/** + * Physical plan node for v2 create table as select, when the catalog is determined to support + * staging table creation. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * The CTAS operation is atomic. The creation of the table is staged and the commit of the write + * should bundle the commitment of the metadata and the table contents in a single unit. If the + * write fails, the table is instructed to roll back all staged changes. + */ +case class AtomicCreateTableAsSelectExec( +catalog: StagingTableCatalog, +ident: Identifier, +partitioning: Seq[Transform], +query: SparkPlan, +properties: Map[String, String], +writeOptions: CaseInsensitiveStringMap, +ifNotExists: Boolean) extends AtomicTableWriteExec { + + override protected def doExecute(): RDD[InternalRow] = { +if (catalog.tableExists(ident)) { + if (ifNotExists) { +return sparkContext.parallelize(Seq.empty, 1) + } + + throw new TableAlreadyExistsException(ident) +} +val stagedTable = catalog.stageCreate( + ident, query.schema, partitioning.toArray, properties.asJava) +writeToStagedTable(stagedTable, writeOptions, ident) + } +} + +/** + * Physical plan node for v2 replace table as select when the catalog does not support staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This is a non-atomic implementation that drops the table and then runs non-atomic Review comment: According to https://github.com/apache/spark/pull/24798/files#r302746896 , this is a broken implementation. RTAS should be able to query any existing tables, including the one that is being replaced. If we do want to have a non-atomic version, how about 1. create a table with a random but unique name (like UUID), insert data to it 2. drop the target table 3. rename the table created in step 1 to the target table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304381665 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -440,6 +440,44 @@ case class CreateTableAsSelect( } } +/** + * Replace a table with a v2 catalog. + * + * If the table does not exist, it will be created. The persisted table will have no contents + * as a result of this operation. + */ +case class ReplaceTable( +catalog: TableCatalog, +tableName: Identifier, +tableSchema: StructType, +partitioning: Seq[Transform], +properties: Map[String, String], +orCreate: Boolean) extends Command + +/** + * Replaces a table from a select query with a v2 catalog. + * + * If the table does not already exist, it will be created. + */ +case class ReplaceTableAsSelect( Review comment: +1, I think we should fail the query if atomic RTAS is not supported. It's a valid use case to access the table being replaced in RTAS, Spark shouldn't throw table not found exception in this case, which is quite confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304381665 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -440,6 +440,44 @@ case class CreateTableAsSelect( } } +/** + * Replace a table with a v2 catalog. + * + * If the table does not exist, it will be created. The persisted table will have no contents + * as a result of this operation. + */ +case class ReplaceTable( +catalog: TableCatalog, +tableName: Identifier, +tableSchema: StructType, +partitioning: Seq[Transform], +properties: Map[String, String], +orCreate: Boolean) extends Command + +/** + * Replaces a table from a select query with a v2 catalog. + * + * If the table does not already exist, it will be created. + */ +case class ReplaceTableAsSelect( Review comment: +1, I think we should fail the query if atomic REPLACE TABLE is not supported. It's a valid use case to access the table being replaced in RTAS, Spark shouldn't throw table not found exception in this case, which is quite confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r304378690 ## File path: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.StagedTable; +import org.apache.spark.sql.sources.v2.SupportsWrite; +import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of + * the a table before committing the table's metadata along with its contents in CREATE TABLE AS + * SELECT or REPLACE TABLE AS SELECT operations. + * + * It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS + * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE + * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first + * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via + * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform + * the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the + * write operation fails, the catalog will have already dropped the table, and the planner cannot + * roll back the dropping of the table. + * + * If the catalog implements this plugin, the catalog can implement the methods to "stage" the + * creation and the replacement of a table. After the table's + * {@link BatchWrite#commit(WriterCommitMessage[])} is called, + * {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can + * complete both the data write and the metadata swap operation atomically. + */ +public interface StagingTableCatalog extends TableCatalog { + + /** + * Stage the creation of a table, preparing it to be committed into the metastore. + * + * When the table is committed, the contents of any writes performed by the Spark planner are + * committed along with the metadata about the table passed into this method's arguments. If the + * table exists when this method is called, the method should throw an exception accordingly. If + * another process concurrently creates the table before this table's staged changes are + * committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions transforms to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table or view already exists for the identifier + * @throws UnsupportedOperationException If a requested partition transform is not supported + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + StagedTable stageCreate( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + + /** + * Stage the replacement of a table, preparing it to be committed into the metastore when the + * returned table's {@link StagedTable#commitStagedChanges()} is called. + * + * When the table is committed, the contents of any writes performed by the Spark planner are + * committed along with the metadata about the table passed into this method's arguments. I
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r302494084 ## File path: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.StagedTable; +import org.apache.spark.sql.sources.v2.SupportsWrite; +import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of + * the a table before committing the table's metadata along with its contents in CREATE TABLE AS + * SELECT or REPLACE TABLE AS SELECT operations. + * + * It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS + * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE + * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first + * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via + * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform + * the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the + * write operation fails, the catalog will have already dropped the table, and the planner cannot + * roll back the dropping of the table. + * + * If the catalog implements this plugin, the catalog can implement the methods to "stage" the + * creation and the replacement of a table. After the table's + * {@link BatchWrite#commit(WriterCommitMessage[])} is called, + * {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can + * complete both the data write and the metadata swap operation atomically. + */ +public interface StagingTableCatalog extends TableCatalog { Review comment: Can we move this API out to a new PR and implement atomic CTAS? I think REPLACE TABLE is not a blocker to this API and we don't have to do them together. This can also help us move forward faster, since designing a new SQL syntax (REPLACE TABLE) usually needs more time to get consensus. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r302494084 ## File path: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.StagedTable; +import org.apache.spark.sql.sources.v2.SupportsWrite; +import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of + * the a table before committing the table's metadata along with its contents in CREATE TABLE AS + * SELECT or REPLACE TABLE AS SELECT operations. + * + * It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS + * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE + * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first + * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via + * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform + * the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the + * write operation fails, the catalog will have already dropped the table, and the planner cannot + * roll back the dropping of the table. + * + * If the catalog implements this plugin, the catalog can implement the methods to "stage" the + * creation and the replacement of a table. After the table's + * {@link BatchWrite#commit(WriterCommitMessage[])} is called, + * {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can + * complete both the data write and the metadata swap operation atomically. + */ +public interface StagingTableCatalog extends TableCatalog { Review comment: Can we move this API out to a new PR and implement atomic CTAS? I think REPLACE TABLE is not very related to this API and we don't have to do them together. This can also help us move forward faster, since designing a new SQL syntax (REPLACE TABLE) usually needs more time to get consensus. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r300295226 ## File path: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ## @@ -261,6 +269,10 @@ createTableHeader : CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier ; +replaceTableHeader Review comment: I'm worried about creating new SQL syntax in Spark. AFAIK a similar syntax is `CREATE OR REPLACE TABLE`, which is implemented in [DB2](https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_72/sqlp/rbafyreplacetable.htm) and [google BigQuery](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language). This is not a standard SQL syntax, so it's not surprising to see that Oracle doesn't support it. If Spark want a API for replace table, I think it's more reasonable to follow DB2 and BigQuery here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 URL: https://github.com/apache/spark/pull/24798#discussion_r300290331 ## File path: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ## @@ -258,6 +266,10 @@ createTableHeader : CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier ; +replaceTableHeader +: REPLACE TEMPORARY? TABLE multipartIdentifier Review comment: Let's get rid of `TEMPORARY TABLE`. It was a mistake and we've almost removed everything about `TEMPORARY TABLE` in Spark, only a few parser rules are left for backward compatibility reason. To clarify, there is no `TEMPORARY TABLE` in Spark, it never had. Spark only has TABLE, VIEW and TEMP VIEW. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org