This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 82607f198 Spark: Provide Procedure for Catalog Register Procedure 
(#4810)
82607f198 is described below

commit 82607f1984a8f5813cfada08c285f96eb274b3b2
Author: Russell Spitzer <[email protected]>
AuthorDate: Fri Jun 3 08:09:44 2022 -0500

    Spark: Provide Procedure for Catalog Register Procedure (#4810)
    
    * Spark: Provide Procedure for Catalog Register Procedure
    
    Adds the ability to invoke a Catalog's register method via a Spark
    procedure. This allows a user to create a catalog entry for a metadata.json
    file which already exists but does not have a corresponding catalog 
identifier.
---
 .../java/org/apache/iceberg/CachingCatalog.java    |   7 ++
 .../extensions/TestRegisterTableProcedure.java     |  89 ++++++++++++++++++
 .../java/org/apache/iceberg/spark/BaseCatalog.java |   3 +-
 .../java/org/apache/iceberg/spark/Spark3Util.java  |  18 ++++
 .../org/apache/iceberg/spark/SparkCatalog.java     |   5 +
 .../apache/iceberg/spark/SparkSessionCatalog.java  |   9 ++
 .../spark/procedures/RegisterTableProcedure.java   | 104 +++++++++++++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 .../iceberg/spark/source/HasIcebergCatalog.java    |  31 ++++++
 .../org/apache/iceberg/spark/TestSpark3Util.java   |  10 ++
 version.txt                                        |   1 +
 11 files changed, 277 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java 
b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
index 99ff076cd..668ae1ff3 100644
--- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
@@ -182,6 +182,13 @@ public class CachingCatalog implements Catalog {
     tableCache.invalidateAll(metadataTableIdentifiers(canonicalized));
   }
 
+  @Override
+  public Table registerTable(TableIdentifier identifier, String 
metadataFileLocation) {
+    Table table = catalog.registerTable(identifier, metadataFileLocation);
+    invalidateTable(identifier);
+    return table;
+  }
+
   private Iterable<TableIdentifier> metadataTableIdentifiers(TableIdentifier 
ident) {
     ImmutableList.Builder<TableIdentifier> builder = ImmutableList.builder();
 
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java
new file mode 100644
index 000000000..d1f1905e0
--- /dev/null
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestRegisterTableProcedure extends SparkExtensionsTestBase {
+
+  private final String targetName;
+
+  public TestRegisterTableProcedure(
+      String catalogName,
+      String implementation,
+      Map<String, String> config) {
+    super(catalogName, implementation, config);
+    targetName = tableName("register_table");
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @After
+  public void dropTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s", targetName);
+  }
+
+  @Test
+  public void testRegisterTable() throws NoSuchTableException, ParseException {
+    Assume.assumeTrue("Register only implemented on Hive Catalogs",
+        spark.conf().get("spark.sql.catalog." + catalogName + 
".type").equals("hive"));
+
+    long numRows = 1000;
+
+    sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName);
+    spark.range(0, numRows)
+        .withColumn("data", functions.col("id").cast(DataTypes.StringType))
+        .writeTo(tableName)
+        .append();
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    long originalFileCount = (long) scalarSql("SELECT COUNT(*) from %s.files", 
tableName);
+    long currentSnapshotId = table.currentSnapshot().snapshotId();
+    String metadataJson = ((HiveTableOperations) (((HasTableOperations) 
table).operations())).currentMetadataLocation();
+
+    List<Object[]> result = sql("CALL %s.system.register_table('%s', '%s')", 
catalogName, targetName, metadataJson);
+    Assert.assertEquals("Current Snapshot is not correct", currentSnapshotId, 
result.get(0)[0]);
+
+    List<Object[]> original = sql("SELECT * FROM %s", tableName);
+    List<Object[]> registered = sql("SELECT * FROM %s", targetName);
+    assertEquals("Registered table rows should match original table rows", 
original, registered);
+    Assert.assertEquals("Should have the right row count in the procedure 
result",
+        numRows, result.get(0)[1]);
+    Assert.assertEquals("Should have the right datafile count in the procedure 
result",
+        originalFileCount, result.get(0)[2]);
+  }
+}
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
index 2942a52a1..43447e346 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark;
 
 import org.apache.iceberg.spark.procedures.SparkProcedures;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
@@ -28,7 +29,7 @@ import 
org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
 
-abstract class BaseCatalog implements StagingTableCatalog, ProcedureCatalog, 
SupportsNamespaces {
+abstract class BaseCatalog implements StagingTableCatalog, ProcedureCatalog, 
SupportsNamespaces, HasIcebergCatalog {
 
   @Override
   public Procedure loadProcedure(Identifier ident) throws 
NoSuchProcedureException {
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index f84e1ae10..94146dbf2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.BoundPredicate;
@@ -50,6 +51,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
 import org.apache.iceberg.spark.SparkTableUtil.SparkPartition;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.iceberg.transforms.PartitionSpecVisitor;
 import org.apache.iceberg.transforms.SortOrderVisitor;
@@ -628,6 +630,22 @@ public class Spark3Util {
     return toIcebergTable(sparkTable);
   }
 
+  /**
+   * Returns the underlying Iceberg Catalog object represented by a Spark 
Catalog
+   * @param spark SparkSession used for looking up catalog reference
+   * @param catalogName The name of the Spark Catalog being referenced
+   * @return the Iceberg catalog class being wrapped by the Spark Catalog
+   */
+  public static Catalog loadIcebergCatalog(SparkSession spark, String 
catalogName) {
+    CatalogPlugin catalogPlugin = 
spark.sessionState().catalogManager().catalog(catalogName);
+    Preconditions.checkArgument(catalogPlugin instanceof HasIcebergCatalog,
+        String.format("Cannot load Iceberg catalog from catalog %s because it 
does not contain an Iceberg Catalog. " +
+                          "Actual Class: %s",
+            catalogName, catalogPlugin.getClass().getName()));
+    return ((HasIcebergCatalog) catalogPlugin).icebergCatalog();
+  }
+
+
   public static CatalogAndIdentifier catalogAndIdentifier(SparkSession spark, 
String name) throws ParseException {
     return catalogAndIdentifier(spark, name, 
spark.sessionState().catalogManager().currentCatalog());
   }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index b8e98171a..c9f7eef6d 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -621,4 +621,9 @@ public class SparkCatalog extends BaseCatalog {
         tables.buildTable(((PathIdentifier) ident).location(), schema) :
         icebergCatalog.buildTable(buildIdentifier(ident), schema);
   }
+
+  @Override
+  public Catalog icebergCatalog() {
+    return icebergCatalog;
+  }
 }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 8245ef1a0..edc544762 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -20,7 +20,9 @@
 package org.apache.iceberg.spark;
 
 import java.util.Map;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -302,4 +304,11 @@ public class SparkSessionCatalog<T extends TableCatalog & 
SupportsNamespaces>
         "Please make sure your are replacing Spark's default catalog, named 
'spark_catalog'.");
     return sessionCatalog;
   }
+
+  @Override
+  public Catalog icebergCatalog() {
+    Preconditions.checkArgument(icebergCatalog instanceof HasIcebergCatalog,
+        "Cannot return underlying Iceberg Catalog, wrapped catalog does not 
contain an Iceberg Catalog");
+    return ((HasIcebergCatalog) icebergCatalog).icebergCatalog();
+  }
 }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
new file mode 100644
index 000000000..d7f1f56d9
--- /dev/null
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+  private static final ProcedureParameter[] PARAMETERS = new 
ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("metadata_file", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new 
StructField[]{
+      new StructField("Current SnapshotId", DataTypes.LongType, true, 
Metadata.empty()),
+      new StructField("Rows", DataTypes.LongType, true, Metadata.empty()),
+      new StructField("Datafiles", DataTypes.LongType, true, Metadata.empty())
+  });
+
+  private RegisterTableProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<RegisterTableProcedure>() {
+      @Override
+      protected RegisterTableProcedure doBuild() {
+        return new RegisterTableProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    TableIdentifier tableName = 
Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), 
"table"));
+    String metadataFile = args.getString(1);
+    Preconditions.checkArgument(tableCatalog() instanceof HasIcebergCatalog,
+        "Cannot use Register Table in a non-Iceberg catalog");
+    Preconditions.checkArgument(metadataFile != null && 
!metadataFile.isEmpty(),
+        "Cannot handle an empty argument metadata_file");
+
+    Catalog icebergCatalog = ((HasIcebergCatalog) 
tableCatalog()).icebergCatalog();
+    Table table = icebergCatalog.registerTable(tableName, metadataFile);
+    Long currentSnapshotId = null;
+    Long totalDataFiles = null;
+    Long totalRecords = null;
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+    if (currentSnapshot != null) {
+      currentSnapshotId = currentSnapshot.snapshotId();
+      totalDataFiles = 
Long.parseLong(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+      totalRecords = 
Long.parseLong(currentSnapshot.summary().get(SnapshotSummary.TOTAL_RECORDS_PROP));
+    }
+
+    return new InternalRow[] {newInternalRow(currentSnapshotId, totalRecords, 
totalDataFiles)};
+  }
+
+  @Override
+  public String description() {
+    return "RegisterTableProcedure";
+  }
+}
+
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 4ce9460b9..1e944fcf0 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -53,6 +53,7 @@ public class SparkProcedures {
     mapBuilder.put("snapshot", SnapshotTableProcedure::builder);
     mapBuilder.put("add_files", AddFilesProcedure::builder);
     mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
+    mapBuilder.put("register_table", RegisterTableProcedure::builder);
     return mapBuilder.build();
   }
 
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/HasIcebergCatalog.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/HasIcebergCatalog.java
new file mode 100644
index 000000000..e2579a005
--- /dev/null
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/HasIcebergCatalog.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+
+public interface HasIcebergCatalog extends TableCatalog {
+
+  /**
+   * Returns the underlying {@link org.apache.iceberg.catalog.Catalog} backing 
this Spark Catalog
+   */
+  Catalog icebergCatalog();
+}
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
index 8d57fe112..be6130f63 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
@@ -19,10 +19,12 @@
 
 package org.apache.iceberg.spark;
 
+import org.apache.iceberg.CachingCatalog;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.SortOrderParser;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
@@ -92,6 +94,14 @@ public class TestSpark3Util extends SparkTestBase {
     Assert.assertTrue(table.name().equals(tableFullName));
   }
 
+  @Test
+  public void testLoadIcebergCatalog() throws Exception {
+    spark.conf().set("spark.sql.catalog.test_cat", 
SparkCatalog.class.getName());
+    spark.conf().set("spark.sql.catalog.test_cat.type", "hive");
+    Catalog catalog = Spark3Util.loadIcebergCatalog(spark, "test_cat");
+    Assert.assertTrue("Should retrieve underlying catalog class", catalog 
instanceof CachingCatalog);
+  }
+
   private SortOrder buildSortOrder(String transform, Schema schema, int 
sourceId) {
     String jsonString = "{\n" +
             "  \"order-id\" : 10,\n" +
diff --git a/version.txt b/version.txt
new file mode 100644
index 000000000..9beb74d49
--- /dev/null
+++ b/version.txt
@@ -0,0 +1 @@
+0.13.2

Reply via email to