This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a81ba17  Support atomic CTAS and RTAS with SparkSessionCatalog (#1183)
a81ba17 is described below

commit a81ba17e15b593c91efe6a629197150f2fff097d
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Jul 8 17:25:54 2020 -0700

    Support atomic CTAS and RTAS with SparkSessionCatalog (#1183)
---
 .../apache/iceberg/spark/RollbackStagedTable.java  | 137 +++++++++++++++++++++
 .../apache/iceberg/spark/SparkSessionCatalog.java  | 111 +++++++++++++++--
 .../iceberg/spark/sql/TestCreateTableAsSelect.java |  28 +----
 3 files changed, 246 insertions(+), 30 deletions(-)

diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java 
b/spark3/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java
new file mode 100644
index 0000000..a27d06e
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.SupportsDelete;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An implementation of StagedTable that mimics the behavior of Spark's 
non-atomic CTAS and RTAS.
+ * <p>
+ * A Spark catalog can implement StagingTableCatalog to support atomic 
operations by producing StagedTable. But if a
+ * catalog implements StagingTableCatalog, Spark expects the catalog to be 
able to produce a StagedTable for any table
+ * loaded by the catalog. This assumption doesn't always work, as in the case 
of {@link SparkSessionCatalog}, which
+ * supports atomic operations can produce a StagedTable for Iceberg tables, 
but wraps the session catalog and cannot
+ * necessarily produce a working StagedTable implementation for tables that it 
loads.
+ * <p>
+ * The work-around is this class, which implements the StagedTable interface 
but does not have atomic behavior. Instead,
+ * the StagedTable interface is used to implement the behavior of the 
non-atomic SQL plans that will create a table,
+ * write, and will drop the table to roll back.
+ * <p>
+ * This StagedTable implements SupportsRead, SupportsWrite, and SupportsDelete 
by passing the calls to the real table.
+ * Implementing those interfaces is safe because Spark will not use them 
unless the table supports them and returns the
+ * corresponding capabilities from {@link #capabilities()}.
+ */
+public class RollbackStagedTable implements StagedTable, SupportsRead, 
SupportsWrite, SupportsDelete {
+  private final TableCatalog catalog;
+  private final Identifier ident;
+  private final Table table;
+
+  public RollbackStagedTable(TableCatalog catalog, Identifier ident, Table 
table) {
+    this.catalog = catalog;
+    this.ident = ident;
+    this.table = table;
+  }
+
+  @Override
+  public void commitStagedChanges() {
+    // the changes have already been committed to the table at the end of the 
write
+  }
+
+  @Override
+  public void abortStagedChanges() {
+    // roll back changes by dropping the table
+    catalog.dropTable(ident);
+  }
+
+  @Override
+  public String name() {
+    return table.name();
+  }
+
+  @Override
+  public StructType schema() {
+    return table.schema();
+  }
+
+  @Override
+  public Transform[] partitioning() {
+    return table.partitioning();
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return table.properties();
+  }
+
+  @Override
+  public Set<TableCapability> capabilities() {
+    return table.capabilities();
+  }
+
+  @Override
+  public void deleteWhere(Filter[] filters) {
+    call(SupportsDelete.class, t -> t.deleteWhere(filters));
+  }
+
+  @Override
+  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    return callReturning(SupportsRead.class, t -> t.newScanBuilder(options));
+  }
+
+  @Override
+  public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+    return callReturning(SupportsWrite.class, t -> t.newWriteBuilder(info));
+  }
+
+  private <T> void call(Class<? extends T> requiredClass, Consumer<T> task) {
+    callReturning(requiredClass, inst -> {
+      task.accept(inst);
+      return null;
+    });
+  }
+
+  private <T, R> R callReturning(Class<? extends T> requiredClass, Function<T, 
R> task) {
+    if (requiredClass.isInstance(table)) {
+      return task.apply(requiredClass.cast(table));
+    } else {
+      throw new UnsupportedOperationException(String.format(
+          "Table does not implement %s: %s (%s)",
+          requiredClass.getSimpleName(), table.name(), 
table.getClass().getName()));
+    }
+  }
+}
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java 
b/spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 9079fae..ea0f0d9 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -28,6 +28,8 @@ import 
org.apache.spark.sql.connector.catalog.CatalogExtension;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -42,14 +44,16 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  * @param <T> CatalogPlugin class to avoid casting to TableCatalog and 
SupportsNamespaces.
  */
 public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
-    implements TableCatalog, SupportsNamespaces, CatalogExtension {
+    implements StagingTableCatalog, SupportsNamespaces, CatalogExtension {
   private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
 
   private String catalogName = null;
   private TableCatalog icebergCatalog = null;
+  private StagingTableCatalog asStagingCatalog = null;
   private T sessionCatalog = null;
   private boolean createParquetAsIceberg = false;
   private boolean createAvroAsIceberg = false;
+  private boolean createOrcAsIceberg = false;
 
   /**
    * Build a {@link SparkCatalog} to be used for Iceberg operations.
@@ -121,18 +125,90 @@ public class SparkSessionCatalog<T extends TableCatalog & 
SupportsNamespaces>
                            Map<String, String> properties)
       throws TableAlreadyExistsException, NoSuchNamespaceException {
     String provider = properties.get("provider");
-    if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
+    if (useIceberg(provider)) {
       return icebergCatalog.createTable(ident, schema, partitions, properties);
+    } else {
+      // delegate to the session catalog
+      return sessionCatalog.createTable(ident, schema, partitions, properties);
+    }
+  }
 
-    } else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) 
{
-      return icebergCatalog.createTable(ident, schema, partitions, properties);
+  @Override
+  public StagedTable stageCreate(Identifier ident, StructType schema, 
Transform[] partitions,
+                                 Map<String, String> properties)
+      throws TableAlreadyExistsException, NoSuchNamespaceException {
+    String provider = properties.get("provider");
+    TableCatalog catalog;
+    if (useIceberg(provider)) {
+      if (asStagingCatalog != null) {
+        return asStagingCatalog.stageCreate(ident, schema, partitions, 
properties);
+      }
+      catalog = icebergCatalog;
+    } else {
+      catalog = sessionCatalog;
+    }
 
-    } else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
-      return icebergCatalog.createTable(ident, schema, partitions, properties);
+    // create the table with the session catalog, then wrap it in a staged 
table that will delete to roll back
+    Table table = catalog.createTable(ident, schema, partitions, properties);
+    return new RollbackStagedTable(catalog, ident, table);
+  }
 
+  @Override
+  public StagedTable stageReplace(Identifier ident, StructType schema, 
Transform[] partitions,
+                                  Map<String, String> properties)
+      throws NoSuchNamespaceException, NoSuchTableException {
+    String provider = properties.get("provider");
+    TableCatalog catalog;
+    if (useIceberg(provider)) {
+      if (asStagingCatalog != null) {
+        return asStagingCatalog.stageReplace(ident, schema, partitions, 
properties);
+      }
+      catalog = icebergCatalog;
     } else {
-      // delegate to the session catalog
-      return sessionCatalog.createTable(ident, schema, partitions, properties);
+      catalog = sessionCatalog;
+    }
+
+    // attempt to drop the table and fail if it doesn't exist
+    if (!catalog.dropTable(ident)) {
+      throw new NoSuchTableException(ident);
+    }
+
+    try {
+      // create the table with the session catalog, then wrap it in a staged 
table that will delete to roll back
+      Table table = catalog.createTable(ident, schema, partitions, properties);
+      return new RollbackStagedTable(catalog, ident, table);
+
+    } catch (TableAlreadyExistsException e) {
+      // the table was deleted, but now already exists again. retry the 
replace.
+      return stageReplace(ident, schema, partitions, properties);
+    }
+  }
+
+  @Override
+  public StagedTable stageCreateOrReplace(Identifier ident, StructType schema, 
Transform[] partitions,
+                                          Map<String, String> properties) 
throws NoSuchNamespaceException {
+    String provider = properties.get("provider");
+    TableCatalog catalog;
+    if (useIceberg(provider)) {
+      if (asStagingCatalog != null) {
+        return asStagingCatalog.stageCreateOrReplace(ident, schema, 
partitions, properties);
+      }
+      catalog = icebergCatalog;
+    } else {
+      catalog = sessionCatalog;
+    }
+
+    // drop the table if it exists
+    catalog.dropTable(ident);
+
+    try {
+      // create the table with the session catalog, then wrap it in a staged 
table that will delete to roll back
+      Table sessionCatalogTable = catalog.createTable(ident, schema, 
partitions, properties);
+      return new RollbackStagedTable(catalog, ident, sessionCatalogTable);
+
+    } catch (TableAlreadyExistsException e) {
+      // the table was deleted, but now already exists again. retry the 
replace.
+      return stageCreateOrReplace(ident, schema, partitions, properties);
     }
   }
 
@@ -167,8 +243,13 @@ public class SparkSessionCatalog<T extends TableCatalog & 
SupportsNamespaces>
   public final void initialize(String name, CaseInsensitiveStringMap options) {
     this.catalogName = name;
     this.icebergCatalog = buildSparkCatalog(name, options);
+    if (icebergCatalog instanceof StagingTableCatalog) {
+      this.asStagingCatalog = (StagingTableCatalog) icebergCatalog;
+    }
+
     this.createParquetAsIceberg = options.getBoolean("parquet-enabled", 
createParquetAsIceberg);
     this.createAvroAsIceberg = options.getBoolean("avro-enabled", 
createAvroAsIceberg);
+    this.createOrcAsIceberg = options.getBoolean("orc-enabled", 
createOrcAsIceberg);
   }
 
   @Override
@@ -185,4 +266,18 @@ public class SparkSessionCatalog<T extends TableCatalog & 
SupportsNamespaces>
   public String name() {
     return catalogName;
   }
+
+  private boolean useIceberg(String provider) {
+    if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
+      return true;
+    } else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) 
{
+      return true;
+    } else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
+      return true;
+    } else if (createOrcAsIceberg && "orc".equalsIgnoreCase(provider)) {
+      return true;
+    }
+
+    return false;
+  }
 }
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
index 2bde262..d02b852 100644
--- 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
+++ 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
@@ -108,20 +108,15 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         "SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END 
AS part " +
         "FROM %s ORDER BY 3, 1", tableName, sourceName);
 
-    // spark_catalog does not use an atomic replace, so the table history and 
old spec is dropped
-    // the other catalogs do use atomic replace, so the spec id is incremented
-    boolean isAtomic = !"spark_catalog".equals(catalogName);
-
     Schema expectedSchema = new Schema(
         Types.NestedField.optional(1, "id", Types.LongType.get()),
         Types.NestedField.optional(2, "data", Types.StringType.get()),
         Types.NestedField.optional(3, "part", Types.StringType.get())
     );
 
-    int specId = isAtomic ? 1 : 0;
     PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
         .identity("part")
-        .withSpecId(specId)
+        .withSpecId(1)
         .build();
 
     Table rtasTable = validationCatalog.loadTable(tableIdent);
@@ -138,7 +133,7 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
     Assert.assertEquals("Table should have expected snapshots",
-        isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+        2, Iterables.size(rtasTable.snapshots()));
   }
 
   @Test
@@ -156,9 +151,6 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         "SELECT 2 * id as id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' 
ELSE 'odd' END AS part " +
         "FROM %s ORDER BY 3, 1", tableName, sourceName);
 
-    // spark_catalog does not use an atomic replace, so the table history is 
dropped
-    boolean isAtomic = !"spark_catalog".equals(catalogName);
-
     Schema expectedSchema = new Schema(
         Types.NestedField.optional(1, "id", Types.LongType.get()),
         Types.NestedField.optional(2, "data", Types.StringType.get()),
@@ -184,7 +176,7 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
     Assert.assertEquals("Table should have expected snapshots",
-        isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+        2, Iterables.size(rtasTable.snapshots()));
   }
 
   @Test
@@ -226,20 +218,15 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         .using("iceberg")
         .replace();
 
-    // spark_catalog does not use an atomic replace, so the table history and 
old spec is dropped
-    // the other catalogs do use atomic replace, so the spec id is incremented
-    boolean isAtomic = !"spark_catalog".equals(catalogName);
-
     Schema expectedSchema = new Schema(
         Types.NestedField.optional(1, "id", Types.LongType.get()),
         Types.NestedField.optional(2, "data", Types.StringType.get()),
         Types.NestedField.optional(3, "part", Types.StringType.get())
     );
 
-    int specId = isAtomic ? 1 : 0;
     PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
         .identity("part")
-        .withSpecId(specId)
+        .withSpecId(1)
         .build();
 
     Table rtasTable = validationCatalog.loadTable(tableIdent);
@@ -256,7 +243,7 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
     Assert.assertEquals("Table should have expected snapshots",
-        isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+        2, Iterables.size(rtasTable.snapshots()));
   }
 
   @Test
@@ -289,9 +276,6 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         .using("iceberg")
         .createOrReplace();
 
-    // spark_catalog does not use an atomic replace, so the table history is 
dropped
-    boolean isAtomic = !"spark_catalog".equals(catalogName);
-
     Schema expectedSchema = new Schema(
         Types.NestedField.optional(1, "id", Types.LongType.get()),
         Types.NestedField.optional(2, "data", Types.StringType.get()),
@@ -317,6 +301,6 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
     Assert.assertEquals("Table should have expected snapshots",
-        isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+        2, Iterables.size(rtasTable.snapshots()));
   }
 }

Reply via email to