[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-21 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r305604944
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 ##
 @@ -89,15 +92,145 @@ case class CreateTableAsSelectExec(
 
 case _ =>
   // table does not support writes
-  throw new SparkException(s"Table implementation does not support 
writes: ${ident.quoted}")
+  throw new SparkException(
+s"Table implementation does not support writes: ${ident.quoted}")
   }
+})(catchBlock = {
+  catalog.dropTable(ident)
+})
+  }
+}
+
+/**
+ * Physical plan node for v2 create table as select, when the catalog is 
determined to support
+ * staging table creation.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * The CTAS operation is atomic. The creation of the table is staged and the 
commit of the write
+ * should bundle the commitment of the metadata and the table contents in a 
single unit. If the
+ * write fails, the table is instructed to roll back all staged changes.
+ */
+case class AtomicCreateTableAsSelectExec(
+catalog: StagingTableCatalog,
+ident: Identifier,
+partitioning: Seq[Transform],
+query: SparkPlan,
+properties: Map[String, String],
+writeOptions: CaseInsensitiveStringMap,
+ifNotExists: Boolean) extends AtomicTableWriteExec {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+if (catalog.tableExists(ident)) {
+  if (ifNotExists) {
+return sparkContext.parallelize(Seq.empty, 1)
+  }
+
+  throw new TableAlreadyExistsException(ident)
+}
+val stagedTable = catalog.stageCreate(
+  ident, query.schema, partitioning.toArray, properties.asJava)
+writeToStagedTable(stagedTable, writeOptions, ident)
+  }
+}
+
+/**
+ * Physical plan node for v2 replace table as select when the catalog does not 
support staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This is a non-atomic implementation that drops the table and 
then runs non-atomic
+ * CTAS. For an atomic implementation for catalogs with the appropriate 
support, see
+ * ReplaceTableAsSelectStagingExec.
+ */
+case class ReplaceTableAsSelectExec(
+catalog: TableCatalog,
+ident: Identifier,
+partitioning: Seq[Transform],
+query: SparkPlan,
+properties: Map[String, String],
+writeOptions: CaseInsensitiveStringMap,
+orCreate: Boolean) extends AtomicTableWriteExec {
+
+  import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
+
+  override protected def doExecute(): RDD[InternalRow] = {
+// Note that this operation is potentially unsafe, but these are the 
strict semantics of
+// RTAS if the catalog does not support atomic operations.
+//
+// There are numerous cases we concede to where the table will be dropped 
and irrecoverable:
+//
+// 1. Creating the new table fails,
+// 2. Writing to the new table fails,
+// 3. The table returned by catalog.createTable doesn't support writing.
+if (catalog.tableExists(ident)) {
+  catalog.dropTable(ident)
+} else if (!orCreate) {
+  throw new CannotReplaceMissingTableException(ident)
+}
+val createdTable = catalog.createTable(
+  ident, query.schema, partitioning.toArray, properties.asJava)
+Utils.tryWithSafeFinallyAndFailureCallbacks({
+  createdTable match {
+case table: SupportsWrite =>
+  val batchWrite = table.newWriteBuilder(writeOptions)
+.withInputDataSchema(query.schema)
+.withQueryId(UUID.randomUUID().toString)
+.buildForBatch()
+
+  doWrite(batchWrite)
 
+case _ =>
+  // table does not support writes
+  throw new SparkException(
+s"Table implementation does not support writes: ${ident.quoted}")
+  }
 })(catchBlock = {
   catalog.dropTable(ident)
 })
   }
 }
 
+/**
+ *
+ * Physical plan node for v2 replace table as select when the catalog supports 
staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This implementation is atomic. The table replacement is 
staged, and the commit
+ * operation at the end should perform tne replacement of the table's metadata 
and contents. If the
+ * write fails, the table is instructed to roll back staged changes and any 
previously written table
+ * is left untouched.
+ */
+case

[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-18 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r305214108
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 ##
 @@ -89,15 +92,145 @@ case class CreateTableAsSelectExec(
 
 case _ =>
   // table does not support writes
-  throw new SparkException(s"Table implementation does not support 
writes: ${ident.quoted}")
+  throw new SparkException(
+s"Table implementation does not support writes: ${ident.quoted}")
   }
+})(catchBlock = {
+  catalog.dropTable(ident)
+})
+  }
+}
+
+/**
+ * Physical plan node for v2 create table as select, when the catalog is 
determined to support
+ * staging table creation.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * The CTAS operation is atomic. The creation of the table is staged and the 
commit of the write
+ * should bundle the commitment of the metadata and the table contents in a 
single unit. If the
+ * write fails, the table is instructed to roll back all staged changes.
+ */
+case class AtomicCreateTableAsSelectExec(
+catalog: StagingTableCatalog,
+ident: Identifier,
+partitioning: Seq[Transform],
+query: SparkPlan,
+properties: Map[String, String],
+writeOptions: CaseInsensitiveStringMap,
+ifNotExists: Boolean) extends AtomicTableWriteExec {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+if (catalog.tableExists(ident)) {
+  if (ifNotExists) {
+return sparkContext.parallelize(Seq.empty, 1)
+  }
+
+  throw new TableAlreadyExistsException(ident)
+}
+val stagedTable = catalog.stageCreate(
+  ident, query.schema, partitioning.toArray, properties.asJava)
+writeToStagedTable(stagedTable, writeOptions, ident)
+  }
+}
+
+/**
+ * Physical plan node for v2 replace table as select when the catalog does not 
support staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This is a non-atomic implementation that drops the table and 
then runs non-atomic
+ * CTAS. For an atomic implementation for catalogs with the appropriate 
support, see
+ * ReplaceTableAsSelectStagingExec.
+ */
+case class ReplaceTableAsSelectExec(
+catalog: TableCatalog,
+ident: Identifier,
+partitioning: Seq[Transform],
+query: SparkPlan,
+properties: Map[String, String],
+writeOptions: CaseInsensitiveStringMap,
+orCreate: Boolean) extends AtomicTableWriteExec {
+
+  import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
+
+  override protected def doExecute(): RDD[InternalRow] = {
+// Note that this operation is potentially unsafe, but these are the 
strict semantics of
+// RTAS if the catalog does not support atomic operations.
+//
+// There are numerous cases we concede to where the table will be dropped 
and irrecoverable:
+//
+// 1. Creating the new table fails,
+// 2. Writing to the new table fails,
+// 3. The table returned by catalog.createTable doesn't support writing.
+if (catalog.tableExists(ident)) {
+  catalog.dropTable(ident)
+} else if (!orCreate) {
+  throw new CannotReplaceMissingTableException(ident)
+}
+val createdTable = catalog.createTable(
+  ident, query.schema, partitioning.toArray, properties.asJava)
+Utils.tryWithSafeFinallyAndFailureCallbacks({
+  createdTable match {
+case table: SupportsWrite =>
+  val batchWrite = table.newWriteBuilder(writeOptions)
+.withInputDataSchema(query.schema)
+.withQueryId(UUID.randomUUID().toString)
+.buildForBatch()
+
+  doWrite(batchWrite)
 
+case _ =>
+  // table does not support writes
+  throw new SparkException(
+s"Table implementation does not support writes: ${ident.quoted}")
+  }
 })(catchBlock = {
   catalog.dropTable(ident)
 })
   }
 }
 
+/**
+ *
+ * Physical plan node for v2 replace table as select when the catalog supports 
staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This implementation is atomic. The table replacement is 
staged, and the commit
+ * operation at the end should perform tne replacement of the table's metadata 
and contents. If the
+ * write fails, the table is instructed to roll back staged changes and any 
previously written table
+ * is left untouched.
+ */
+case

[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-18 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r305214108
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 ##
 @@ -89,15 +92,145 @@ case class CreateTableAsSelectExec(
 
 case _ =>
   // table does not support writes
-  throw new SparkException(s"Table implementation does not support 
writes: ${ident.quoted}")
+  throw new SparkException(
+s"Table implementation does not support writes: ${ident.quoted}")
   }
+})(catchBlock = {
+  catalog.dropTable(ident)
+})
+  }
+}
+
+/**
+ * Physical plan node for v2 create table as select, when the catalog is 
determined to support
+ * staging table creation.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * The CTAS operation is atomic. The creation of the table is staged and the 
commit of the write
+ * should bundle the commitment of the metadata and the table contents in a 
single unit. If the
+ * write fails, the table is instructed to roll back all staged changes.
+ */
+case class AtomicCreateTableAsSelectExec(
+catalog: StagingTableCatalog,
+ident: Identifier,
+partitioning: Seq[Transform],
+query: SparkPlan,
+properties: Map[String, String],
+writeOptions: CaseInsensitiveStringMap,
+ifNotExists: Boolean) extends AtomicTableWriteExec {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+if (catalog.tableExists(ident)) {
+  if (ifNotExists) {
+return sparkContext.parallelize(Seq.empty, 1)
+  }
+
+  throw new TableAlreadyExistsException(ident)
+}
+val stagedTable = catalog.stageCreate(
+  ident, query.schema, partitioning.toArray, properties.asJava)
+writeToStagedTable(stagedTable, writeOptions, ident)
+  }
+}
+
+/**
+ * Physical plan node for v2 replace table as select when the catalog does not 
support staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This is a non-atomic implementation that drops the table and 
then runs non-atomic
+ * CTAS. For an atomic implementation for catalogs with the appropriate 
support, see
+ * ReplaceTableAsSelectStagingExec.
+ */
+case class ReplaceTableAsSelectExec(
+catalog: TableCatalog,
+ident: Identifier,
+partitioning: Seq[Transform],
+query: SparkPlan,
+properties: Map[String, String],
+writeOptions: CaseInsensitiveStringMap,
+orCreate: Boolean) extends AtomicTableWriteExec {
+
+  import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
+
+  override protected def doExecute(): RDD[InternalRow] = {
+// Note that this operation is potentially unsafe, but these are the 
strict semantics of
+// RTAS if the catalog does not support atomic operations.
+//
+// There are numerous cases we concede to where the table will be dropped 
and irrecoverable:
+//
+// 1. Creating the new table fails,
+// 2. Writing to the new table fails,
+// 3. The table returned by catalog.createTable doesn't support writing.
+if (catalog.tableExists(ident)) {
+  catalog.dropTable(ident)
+} else if (!orCreate) {
+  throw new CannotReplaceMissingTableException(ident)
+}
+val createdTable = catalog.createTable(
+  ident, query.schema, partitioning.toArray, properties.asJava)
+Utils.tryWithSafeFinallyAndFailureCallbacks({
+  createdTable match {
+case table: SupportsWrite =>
+  val batchWrite = table.newWriteBuilder(writeOptions)
+.withInputDataSchema(query.schema)
+.withQueryId(UUID.randomUUID().toString)
+.buildForBatch()
+
+  doWrite(batchWrite)
 
+case _ =>
+  // table does not support writes
+  throw new SparkException(
+s"Table implementation does not support writes: ${ident.quoted}")
+  }
 })(catchBlock = {
   catalog.dropTable(ident)
 })
   }
 }
 
+/**
+ *
+ * Physical plan node for v2 replace table as select when the catalog supports 
staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This implementation is atomic. The table replacement is 
staged, and the commit
+ * operation at the end should perform tne replacement of the table's metadata 
and contents. If the
+ * write fails, the table is instructed to roll back staged changes and any 
previously written table
+ * is left untouched.
+ */
+case

[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-18 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r305213786
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java
 ##
 @@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import java.util.Map;
+
+import org.apache.spark.sql.catalog.v2.expressions.Transform;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.sources.v2.StagedTable;
+import org.apache.spark.sql.sources.v2.SupportsWrite;
+import org.apache.spark.sql.sources.v2.writer.BatchWrite;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An optional mix-in for implementations of {@link TableCatalog} that support 
staging creation of
+ * the a table before committing the table's metadata along with its contents 
in CREATE TABLE AS
+ * SELECT or REPLACE TABLE AS SELECT operations.
+ * 
+ * It is highly recommended to implement this trait whenever possible so that 
CREATE TABLE AS
+ * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when 
one runs a REPLACE
+ * TABLE AS SELECT operation, if the catalog does not implement this trait, 
the planner will first
+ * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create 
the table via
+ * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, 
and then perform
+ * the write via {@link 
SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
+ * write operation fails, the catalog will have already dropped the table, and 
the planner cannot
+ * roll back the dropping of the table.
+ * 
+ * If the catalog implements this plugin, the catalog can implement the 
methods to "stage" the
+ * creation and the replacement of a table. After the table's
+ * {@link BatchWrite#commit(WriterCommitMessage[])} is called,
+ * {@link StagedTable#commitStagedChanges()} is called, at which point the 
staged table can
+ * complete both the data write and the metadata swap operation atomically.
+ */
+public interface StagingTableCatalog extends TableCatalog {
+
+  /**
+   * Stage the creation of a table, preparing it to be committed into the 
metastore.
+   * 
+   * When the table is committed, the contents of any writes performed by the 
Spark planner are
+   * committed along with the metadata about the table passed into this 
method's arguments. If the
+   * table exists when this method is called, the method should throw an 
exception accordingly. If
+   * another process concurrently creates the table before this table's staged 
changes are
+   * committed, an exception should be thrown by {@link 
StagedTable#commitStagedChanges()}.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param partitions transforms to use for partitioning data in the table
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table or view already exists for 
the identifier
+   * @throws UnsupportedOperationException If a requested partition transform 
is not supported
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  StagedTable stageCreate(
+  Identifier ident,
+  StructType schema,
+  Transform[] partitions,
+  Map properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+
+  /**
+   * Stage the replacement of a table, preparing it to be committed into the 
metastore when the
+   * returned table's {@link StagedTable#commitStagedChanges()} is called.
+   * 
+   * When the table is committed, the contents of any writes performed by the 
Spark planner are
+   * committed along with the metadata about the table passed into this 
method's arguments. I

[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-18 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r305213047
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ##
 @@ -2294,6 +2303,69 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Replace a table, returning a [[ReplaceTableStatement]] logical plan.
+   *
+   * Expected format:
+   * {{{
+   *   REPLACE TABLE [IF NOT EXISTS] [db_name.]table_name
 
 Review comment:
   this doesn't match the actual syntax now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304725602
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##
 @@ -196,7 +201,105 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+if ("true".equalsIgnoreCase(
+  
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) 
{
+  throw new IllegalStateException("Manual create table failure.")
+}
+  }
+
+  def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): 
Unit = {
+if (tableOptions.getBoolean(
+  TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+  throw new IllegalStateException("Manual write to table failure.")
+}
+  }
+}
+
+class TestStagingInMemoryCatalog
+  extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+  override def stageCreate(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
false)
+  }
+
+  override def stageReplace(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
true)
+  }
+
+  private def newStagedTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String],
+  replaceIfExists: Boolean): StagedTable = {
+import CatalogV2Implicits.IdentifierHelper
+if (partitions.nonEmpty) {
+  throw new UnsupportedOperationException(
+s"Catalog $name: Partitioned tables are not supported")
+}
+
+TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+new TestStagedTable(
+  ident,
+  new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+  replaceIfExists)
+  }
+
+  private class TestStagedTable(
+  ident: Identifier,
+  delegateTable: InMemoryTable,
+  replaceIfExists: Boolean)
+extends StagedTable with SupportsWrite with SupportsRead {
+
+override def commitStagedChanges(): Unit = {
+  if (droppedTables.contains(ident)) {
 
 Review comment:
   BTW I think my proposal works for REPLACE TABLE right? `stageCreate` is for 
CTAS and I think your current code(without tracking dropped tables) already 
works


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304725602
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##
 @@ -196,7 +201,105 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+if ("true".equalsIgnoreCase(
+  
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) 
{
+  throw new IllegalStateException("Manual create table failure.")
+}
+  }
+
+  def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): 
Unit = {
+if (tableOptions.getBoolean(
+  TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+  throw new IllegalStateException("Manual write to table failure.")
+}
+  }
+}
+
+class TestStagingInMemoryCatalog
+  extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+  override def stageCreate(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
false)
+  }
+
+  override def stageReplace(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
true)
+  }
+
+  private def newStagedTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String],
+  replaceIfExists: Boolean): StagedTable = {
+import CatalogV2Implicits.IdentifierHelper
+if (partitions.nonEmpty) {
+  throw new UnsupportedOperationException(
+s"Catalog $name: Partitioned tables are not supported")
+}
+
+TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+new TestStagedTable(
+  ident,
+  new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+  replaceIfExists)
+  }
+
+  private class TestStagedTable(
+  ident: Identifier,
+  delegateTable: InMemoryTable,
+  replaceIfExists: Boolean)
+extends StagedTable with SupportsWrite with SupportsRead {
+
+override def commitStagedChanges(): Unit = {
+  if (droppedTables.contains(ident)) {
 
 Review comment:
   BTW I think my proposal works for REPLACE TABLE right? `stageCreate` is for 
CTAS and I think your current code already works


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304719952
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##
 @@ -196,7 +201,105 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+if ("true".equalsIgnoreCase(
+  
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) 
{
+  throw new IllegalStateException("Manual create table failure.")
+}
+  }
+
+  def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): 
Unit = {
+if (tableOptions.getBoolean(
+  TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+  throw new IllegalStateException("Manual write to table failure.")
+}
+  }
+}
+
+class TestStagingInMemoryCatalog
+  extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+  override def stageCreate(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
false)
+  }
+
+  override def stageReplace(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
true)
+  }
+
+  private def newStagedTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String],
+  replaceIfExists: Boolean): StagedTable = {
+import CatalogV2Implicits.IdentifierHelper
+if (partitions.nonEmpty) {
+  throw new UnsupportedOperationException(
+s"Catalog $name: Partitioned tables are not supported")
+}
+
+TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+new TestStagedTable(
+  ident,
+  new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+  replaceIfExists)
+  }
+
+  private class TestStagedTable(
+  ident: Identifier,
+  delegateTable: InMemoryTable,
+  replaceIfExists: Boolean)
+extends StagedTable with SupportsWrite with SupportsRead {
+
+override def commitStagedChanges(): Unit = {
+  if (droppedTables.contains(ident)) {
 
 Review comment:
   Think about a REPLACE TABLE and DROP TABLE happen at the same time. It 
doesn't matter which one gets executed first, but the final result must be 
reachable by one certain execution order.
   
   If REPLACE TABLE executes first, then there should be no table at the end as 
it's dropped.
   If DROP TABLE executes first, then REPLACE TABLE should fail and there is 
still no table at the end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304719952
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##
 @@ -196,7 +201,105 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+if ("true".equalsIgnoreCase(
+  
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) 
{
+  throw new IllegalStateException("Manual create table failure.")
+}
+  }
+
+  def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): 
Unit = {
+if (tableOptions.getBoolean(
+  TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+  throw new IllegalStateException("Manual write to table failure.")
+}
+  }
+}
+
+class TestStagingInMemoryCatalog
+  extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+  override def stageCreate(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
false)
+  }
+
+  override def stageReplace(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
true)
+  }
+
+  private def newStagedTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String],
+  replaceIfExists: Boolean): StagedTable = {
+import CatalogV2Implicits.IdentifierHelper
+if (partitions.nonEmpty) {
+  throw new UnsupportedOperationException(
+s"Catalog $name: Partitioned tables are not supported")
+}
+
+TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+new TestStagedTable(
+  ident,
+  new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+  replaceIfExists)
+  }
+
+  private class TestStagedTable(
+  ident: Identifier,
+  delegateTable: InMemoryTable,
+  replaceIfExists: Boolean)
+extends StagedTable with SupportsWrite with SupportsRead {
+
+override def commitStagedChanges(): Unit = {
+  if (droppedTables.contains(ident)) {
 
 Review comment:
   Think about a REPLACE TABLE and DROP TABLE happen at the same time. It 
doesn't matter which one gets executed first, but the final result match be 
reachable by one certain execution order.
   
   If REPLACE TABLE executes first, then there should be no table at the end as 
it's dropped.
   If DROP TABLE executes first, then REPLACE TABLE should fail and there is 
still no table at the end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304708250
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##
 @@ -196,7 +201,105 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+if ("true".equalsIgnoreCase(
+  
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) 
{
+  throw new IllegalStateException("Manual create table failure.")
+}
+  }
+
+  def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): 
Unit = {
+if (tableOptions.getBoolean(
+  TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+  throw new IllegalStateException("Manual write to table failure.")
+}
+  }
+}
+
+class TestStagingInMemoryCatalog
+  extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+  override def stageCreate(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
false)
+  }
+
+  override def stageReplace(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
true)
+  }
+
+  private def newStagedTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String],
+  replaceIfExists: Boolean): StagedTable = {
+import CatalogV2Implicits.IdentifierHelper
+if (partitions.nonEmpty) {
+  throw new UnsupportedOperationException(
+s"Catalog $name: Partitioned tables are not supported")
+}
+
+TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+new TestStagedTable(
+  ident,
+  new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+  replaceIfExists)
+  }
+
+  private class TestStagedTable(
+  ident: Identifier,
+  delegateTable: InMemoryTable,
+  replaceIfExists: Boolean)
+extends StagedTable with SupportsWrite with SupportsRead {
+
+override def commitStagedChanges(): Unit = {
+  if (droppedTables.contains(ident)) {
 
 Review comment:
   it's weird to record all the dropped tables in the history. I think a simple 
version is
   ```
   if (replaceIfExists) {
 if (!tables.containsKey(ident)) {
 throw new RuntimeException("table already dropped")
 }
 tables.put(ident, delegateTable)
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304390373
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##
 @@ -196,7 +196,103 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+if 
(tableProperties.containsKey(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY)
+  && 
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY)
+  .equalsIgnoreCase("true")) {
+  throw new IllegalStateException("Manual create table failure.")
+}
+  }
+
+  def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): 
Unit = {
+val shouldSimulateFailedWrite = tableOptions
+  .getBoolean(TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)
+if (shouldSimulateFailedWrite) {
+  throw new IllegalStateException("Manual write to table failure.")
+}
+  }
+}
+
+class TestStagingInMemoryCatalog
+  extends TestInMemoryTableCatalog with StagingTableCatalog {
+
+  override def stageCreate(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
false)
+  }
+
+  override def stageReplace(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String]): StagedTable = {
+newStagedTable(ident, schema, partitions, properties, replaceIfExists = 
true)
+  }
+
+  private def newStagedTable(
+  ident: Identifier,
+  schema: StructType,
+  partitions: Array[Transform],
+  properties: util.Map[String, String],
+  replaceIfExists: Boolean): StagedTable = {
+import CatalogV2Implicits.IdentifierHelper
+if (partitions.nonEmpty) {
+  throw new UnsupportedOperationException(
+s"Catalog $name: Partitioned tables are not supported")
+}
+
+TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
+
+new TestStagedTable(
+  ident,
+  new InMemoryTable(s"$name.${ident.quoted}", schema, properties),
+  replaceIfExists)
+  }
+
+  private class TestStagedTable(
+  ident: Identifier,
+  delegateTable: InMemoryTable,
+  replaceIfExists: Boolean)
+extends StagedTable with SupportsWrite with SupportsRead {
+
+override def commitStagedChanges(): Unit = {
+  if (replaceIfExists) {
+tables.put(ident, delegateTable)
 
 Review comment:
   nit: when committing REPLACE TABLE, we should fail if the table is already 
dropped by others.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304389548
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 ##
 @@ -196,7 +196,103 @@ private class InMemoryTable(
   }
 }
 
-private class BufferedRows extends WriterCommitMessage with InputPartition 
with Serializable {
+object TestInMemoryTableCatalog {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+  val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+
+  def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, 
String]): Unit = {
+if 
(tableProperties.containsKey(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY)
+  && 
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY)
+  .equalsIgnoreCase("true")) {
 
 Review comment:
   nit: we can just write 
`"true".equalsIgnoreCase(tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))`
   
   if the key doesn't exist, `"true".equalsIgnoreCase(null)` returns false.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304388426
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 ##
 @@ -89,15 +92,140 @@ case class CreateTableAsSelectExec(
 
 case _ =>
   // table does not support writes
-  throw new SparkException(s"Table implementation does not support 
writes: ${ident.quoted}")
+  throw new SparkException(
+s"Table implementation does not support writes: ${ident.quoted}")
   }
+})(catchBlock = {
+  catalog.dropTable(ident)
+})
+  }
+}
+
+/**
+ * Physical plan node for v2 create table as select, when the catalog is 
determined to support
+ * staging table creation.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * The CTAS operation is atomic. The creation of the table is staged and the 
commit of the write
+ * should bundle the commitment of the metadata and the table contents in a 
single unit. If the
+ * write fails, the table is instructed to roll back all staged changes.
+ */
+case class AtomicCreateTableAsSelectExec(
+catalog: StagingTableCatalog,
+ident: Identifier,
+partitioning: Seq[Transform],
+query: SparkPlan,
+properties: Map[String, String],
+writeOptions: CaseInsensitiveStringMap,
+ifNotExists: Boolean) extends AtomicTableWriteExec {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+if (catalog.tableExists(ident)) {
+  if (ifNotExists) {
+return sparkContext.parallelize(Seq.empty, 1)
+  }
+
+  throw new TableAlreadyExistsException(ident)
+}
+val stagedTable = catalog.stageCreate(
+  ident, query.schema, partitioning.toArray, properties.asJava)
+writeToStagedTable(stagedTable, writeOptions, ident)
+  }
+}
+
+/**
+ * Physical plan node for v2 replace table as select when the catalog does not 
support staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This is a non-atomic implementation that drops the table and 
then runs non-atomic
 
 Review comment:
   IMO the non-atomic version is allowed to have undefined behavior when 
failure happens middle way. But it should work as the atomic version if no 
failure happens.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304387491
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 ##
 @@ -89,15 +92,140 @@ case class CreateTableAsSelectExec(
 
 case _ =>
   // table does not support writes
-  throw new SparkException(s"Table implementation does not support 
writes: ${ident.quoted}")
+  throw new SparkException(
+s"Table implementation does not support writes: ${ident.quoted}")
   }
+})(catchBlock = {
+  catalog.dropTable(ident)
+})
+  }
+}
+
+/**
+ * Physical plan node for v2 create table as select, when the catalog is 
determined to support
+ * staging table creation.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * The CTAS operation is atomic. The creation of the table is staged and the 
commit of the write
+ * should bundle the commitment of the metadata and the table contents in a 
single unit. If the
+ * write fails, the table is instructed to roll back all staged changes.
+ */
+case class AtomicCreateTableAsSelectExec(
+catalog: StagingTableCatalog,
+ident: Identifier,
+partitioning: Seq[Transform],
+query: SparkPlan,
+properties: Map[String, String],
+writeOptions: CaseInsensitiveStringMap,
+ifNotExists: Boolean) extends AtomicTableWriteExec {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+if (catalog.tableExists(ident)) {
+  if (ifNotExists) {
+return sparkContext.parallelize(Seq.empty, 1)
+  }
+
+  throw new TableAlreadyExistsException(ident)
+}
+val stagedTable = catalog.stageCreate(
+  ident, query.schema, partitioning.toArray, properties.asJava)
+writeToStagedTable(stagedTable, writeOptions, ident)
+  }
+}
+
+/**
+ * Physical plan node for v2 replace table as select when the catalog does not 
support staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from 
the query are appended.
+ * If the table exists, its contents and schema should be replaced with the 
schema and the contents
+ * of the query. This is a non-atomic implementation that drops the table and 
then runs non-atomic
 
 Review comment:
   According to https://github.com/apache/spark/pull/24798/files#r302746896 , 
this is a broken implementation. RTAS should be able to query any existing 
tables, including the one that is being replaced. If we do want to have a 
non-atomic version, how about
   1. create a table with a random but unique name (like UUID), insert data to 
it
   2. drop the target table
   3. rename the table created in step 1 to the target table.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304381665
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ##
 @@ -440,6 +440,44 @@ case class CreateTableAsSelect(
   }
 }
 
+/**
+ * Replace a table with a v2 catalog.
+ *
+ * If the table does not exist, it will be created. The persisted table will 
have no contents
+ * as a result of this operation.
+ */
+case class ReplaceTable(
+catalog: TableCatalog,
+tableName: Identifier,
+tableSchema: StructType,
+partitioning: Seq[Transform],
+properties: Map[String, String],
+orCreate: Boolean) extends Command
+
+/**
+ * Replaces a table from a select query with a v2 catalog.
+ *
+ * If the table does not already exist, it will be created.
+ */
+case class ReplaceTableAsSelect(
 
 Review comment:
   +1, I think we should fail the query if atomic RTAS is not supported. It's a 
valid use case to access the table being replaced in RTAS,  Spark shouldn't 
throw table not found exception in this case, which is quite confusing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304381665
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ##
 @@ -440,6 +440,44 @@ case class CreateTableAsSelect(
   }
 }
 
+/**
+ * Replace a table with a v2 catalog.
+ *
+ * If the table does not exist, it will be created. The persisted table will 
have no contents
+ * as a result of this operation.
+ */
+case class ReplaceTable(
+catalog: TableCatalog,
+tableName: Identifier,
+tableSchema: StructType,
+partitioning: Seq[Transform],
+properties: Map[String, String],
+orCreate: Boolean) extends Command
+
+/**
+ * Replaces a table from a select query with a v2 catalog.
+ *
+ * If the table does not already exist, it will be created.
+ */
+case class ReplaceTableAsSelect(
 
 Review comment:
   +1, I think we should fail the query if atomic REPLACE TABLE is not 
supported. It's a valid use case to access the table being replaced in RTAS,  
Spark shouldn't throw table not found exception in this case, which is quite 
confusing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-17 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r304378690
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import java.util.Map;
+
+import org.apache.spark.sql.catalog.v2.expressions.Transform;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.sources.v2.StagedTable;
+import org.apache.spark.sql.sources.v2.SupportsWrite;
+import org.apache.spark.sql.sources.v2.writer.BatchWrite;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An optional mix-in for implementations of {@link TableCatalog} that support 
staging creation of
+ * the a table before committing the table's metadata along with its contents 
in CREATE TABLE AS
+ * SELECT or REPLACE TABLE AS SELECT operations.
+ * 
+ * It is highly recommended to implement this trait whenever possible so that 
CREATE TABLE AS
+ * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when 
one runs a REPLACE
+ * TABLE AS SELECT operation, if the catalog does not implement this trait, 
the planner will first
+ * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create 
the table via
+ * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, 
and then perform
+ * the write via {@link 
SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
+ * write operation fails, the catalog will have already dropped the table, and 
the planner cannot
+ * roll back the dropping of the table.
+ * 
+ * If the catalog implements this plugin, the catalog can implement the 
methods to "stage" the
+ * creation and the replacement of a table. After the table's
+ * {@link BatchWrite#commit(WriterCommitMessage[])} is called,
+ * {@link StagedTable#commitStagedChanges()} is called, at which point the 
staged table can
+ * complete both the data write and the metadata swap operation atomically.
+ */
+public interface StagingTableCatalog extends TableCatalog {
+
+  /**
+   * Stage the creation of a table, preparing it to be committed into the 
metastore.
+   * 
+   * When the table is committed, the contents of any writes performed by the 
Spark planner are
+   * committed along with the metadata about the table passed into this 
method's arguments. If the
+   * table exists when this method is called, the method should throw an 
exception accordingly. If
+   * another process concurrently creates the table before this table's staged 
changes are
+   * committed, an exception should be thrown by {@link 
StagedTable#commitStagedChanges()}.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param partitions transforms to use for partitioning data in the table
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table or view already exists for 
the identifier
+   * @throws UnsupportedOperationException If a requested partition transform 
is not supported
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  StagedTable stageCreate(
+  Identifier ident,
+  StructType schema,
+  Transform[] partitions,
+  Map properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+
+  /**
+   * Stage the replacement of a table, preparing it to be committed into the 
metastore when the
+   * returned table's {@link StagedTable#commitStagedChanges()} is called.
+   * 
+   * When the table is committed, the contents of any writes performed by the 
Spark planner are
+   * committed along with the metadata about the table passed into this 
method's arguments. I

[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-11 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r302494084
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import java.util.Map;
+
+import org.apache.spark.sql.catalog.v2.expressions.Transform;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.sources.v2.StagedTable;
+import org.apache.spark.sql.sources.v2.SupportsWrite;
+import org.apache.spark.sql.sources.v2.writer.BatchWrite;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An optional mix-in for implementations of {@link TableCatalog} that support 
staging creation of
+ * the a table before committing the table's metadata along with its contents 
in CREATE TABLE AS
+ * SELECT or REPLACE TABLE AS SELECT operations.
+ * 
+ * It is highly recommended to implement this trait whenever possible so that 
CREATE TABLE AS
+ * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when 
one runs a REPLACE
+ * TABLE AS SELECT operation, if the catalog does not implement this trait, 
the planner will first
+ * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create 
the table via
+ * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, 
and then perform
+ * the write via {@link 
SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
+ * write operation fails, the catalog will have already dropped the table, and 
the planner cannot
+ * roll back the dropping of the table.
+ * 
+ * If the catalog implements this plugin, the catalog can implement the 
methods to "stage" the
+ * creation and the replacement of a table. After the table's
+ * {@link BatchWrite#commit(WriterCommitMessage[])} is called,
+ * {@link StagedTable#commitStagedChanges()} is called, at which point the 
staged table can
+ * complete both the data write and the metadata swap operation atomically.
+ */
+public interface StagingTableCatalog extends TableCatalog {
 
 Review comment:
   Can we move this API out to a new PR and implement atomic CTAS? I think 
REPLACE TABLE is not a blocker to this API and we don't have to do them 
together. This can also help us move forward faster, since designing a new SQL 
syntax (REPLACE TABLE) usually needs more time to get consensus.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-11 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r302494084
 
 

 ##
 File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import java.util.Map;
+
+import org.apache.spark.sql.catalog.v2.expressions.Transform;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.sources.v2.StagedTable;
+import org.apache.spark.sql.sources.v2.SupportsWrite;
+import org.apache.spark.sql.sources.v2.writer.BatchWrite;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An optional mix-in for implementations of {@link TableCatalog} that support 
staging creation of
+ * the a table before committing the table's metadata along with its contents 
in CREATE TABLE AS
+ * SELECT or REPLACE TABLE AS SELECT operations.
+ * 
+ * It is highly recommended to implement this trait whenever possible so that 
CREATE TABLE AS
+ * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when 
one runs a REPLACE
+ * TABLE AS SELECT operation, if the catalog does not implement this trait, 
the planner will first
+ * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create 
the table via
+ * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, 
and then perform
+ * the write via {@link 
SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
+ * write operation fails, the catalog will have already dropped the table, and 
the planner cannot
+ * roll back the dropping of the table.
+ * 
+ * If the catalog implements this plugin, the catalog can implement the 
methods to "stage" the
+ * creation and the replacement of a table. After the table's
+ * {@link BatchWrite#commit(WriterCommitMessage[])} is called,
+ * {@link StagedTable#commitStagedChanges()} is called, at which point the 
staged table can
+ * complete both the data write and the metadata swap operation atomically.
+ */
+public interface StagingTableCatalog extends TableCatalog {
 
 Review comment:
   Can we move this API out to a new PR and implement atomic CTAS? I think 
REPLACE TABLE is not very related to this API and we don't have to do them 
together. This can also help us move forward faster, since designing a new SQL 
syntax (REPLACE TABLE) usually needs more time to get consensus.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-04 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r300295226
 
 

 ##
 File path: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
 ##
 @@ -261,6 +269,10 @@ createTableHeader
 : CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier
 ;
 
+replaceTableHeader
 
 Review comment:
   I'm worried about creating new SQL syntax in Spark. AFAIK a similar syntax 
is `CREATE OR REPLACE TABLE`, which is implemented in 
[DB2](https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_72/sqlp/rbafyreplacetable.htm)
 and [google 
BigQuery](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language).
   
   This is not a standard SQL syntax, so it's not surprising to see that Oracle 
doesn't support it. If Spark want a API for replace table, I think it's more 
reasonable to follow DB2 and BigQuery here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

2019-07-04 Thread GitBox
cloud-fan commented on a change in pull request #24798: [SPARK-27724][SQL] 
Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2
URL: https://github.com/apache/spark/pull/24798#discussion_r300290331
 
 

 ##
 File path: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
 ##
 @@ -258,6 +266,10 @@ createTableHeader
 : CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier
 ;
 
+replaceTableHeader
+: REPLACE TEMPORARY? TABLE multipartIdentifier
 
 Review comment:
   Let's get rid of `TEMPORARY TABLE`. It was a mistake and we've almost 
removed everything about `TEMPORARY TABLE` in Spark, only a few parser rules 
are left for backward compatibility reason.
   
   To clarify, there is no `TEMPORARY TABLE` in Spark, it never had. Spark only 
has TABLE, VIEW and TEMP VIEW.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org