This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 82607f198 Spark: Provide Procedure for Catalog Register Procedure
(#4810)
82607f198 is described below
commit 82607f1984a8f5813cfada08c285f96eb274b3b2
Author: Russell Spitzer <[email protected]>
AuthorDate: Fri Jun 3 08:09:44 2022 -0500
Spark: Provide Procedure for Catalog Register Procedure (#4810)
* Spark: Provide Procedure for Catalog Register Procedure
Adds the ability to invoke a Catalog's register method via a Spark
procedure. This allows a user to create a catalog entry for a metadata.json
file which already exists but does not have a corresponding catalog
identifier.
---
.../java/org/apache/iceberg/CachingCatalog.java | 7 ++
.../extensions/TestRegisterTableProcedure.java | 89 ++++++++++++++++++
.../java/org/apache/iceberg/spark/BaseCatalog.java | 3 +-
.../java/org/apache/iceberg/spark/Spark3Util.java | 18 ++++
.../org/apache/iceberg/spark/SparkCatalog.java | 5 +
.../apache/iceberg/spark/SparkSessionCatalog.java | 9 ++
.../spark/procedures/RegisterTableProcedure.java | 104 +++++++++++++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
.../iceberg/spark/source/HasIcebergCatalog.java | 31 ++++++
.../org/apache/iceberg/spark/TestSpark3Util.java | 10 ++
version.txt | 1 +
11 files changed, 277 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java
b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
index 99ff076cd..668ae1ff3 100644
--- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
@@ -182,6 +182,13 @@ public class CachingCatalog implements Catalog {
tableCache.invalidateAll(metadataTableIdentifiers(canonicalized));
}
+ @Override
+ public Table registerTable(TableIdentifier identifier, String
metadataFileLocation) {
+ Table table = catalog.registerTable(identifier, metadataFileLocation);
+ invalidateTable(identifier);
+ return table;
+ }
+
private Iterable<TableIdentifier> metadataTableIdentifiers(TableIdentifier
ident) {
ImmutableList.Builder<TableIdentifier> builder = ImmutableList.builder();
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java
new file mode 100644
index 000000000..d1f1905e0
--- /dev/null
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestRegisterTableProcedure extends SparkExtensionsTestBase {
+
+ private final String targetName;
+
+ public TestRegisterTableProcedure(
+ String catalogName,
+ String implementation,
+ Map<String, String> config) {
+ super(catalogName, implementation, config);
+ targetName = tableName("register_table");
+ }
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @After
+ public void dropTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s", targetName);
+ }
+
+ @Test
+ public void testRegisterTable() throws NoSuchTableException, ParseException {
+ Assume.assumeTrue("Register only implemented on Hive Catalogs",
+ spark.conf().get("spark.sql.catalog." + catalogName +
".type").equals("hive"));
+
+ long numRows = 1000;
+
+ sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName);
+ spark.range(0, numRows)
+ .withColumn("data", functions.col("id").cast(DataTypes.StringType))
+ .writeTo(tableName)
+ .append();
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ long originalFileCount = (long) scalarSql("SELECT COUNT(*) from %s.files",
tableName);
+ long currentSnapshotId = table.currentSnapshot().snapshotId();
+ String metadataJson = ((HiveTableOperations) (((HasTableOperations)
table).operations())).currentMetadataLocation();
+
+ List<Object[]> result = sql("CALL %s.system.register_table('%s', '%s')",
catalogName, targetName, metadataJson);
+ Assert.assertEquals("Current Snapshot is not correct", currentSnapshotId,
result.get(0)[0]);
+
+ List<Object[]> original = sql("SELECT * FROM %s", tableName);
+ List<Object[]> registered = sql("SELECT * FROM %s", targetName);
+ assertEquals("Registered table rows should match original table rows",
original, registered);
+ Assert.assertEquals("Should have the right row count in the procedure
result",
+ numRows, result.get(0)[1]);
+ Assert.assertEquals("Should have the right datafile count in the procedure
result",
+ originalFileCount, result.get(0)[2]);
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
index 2942a52a1..43447e346 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark;
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
@@ -28,7 +29,7 @@ import
org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
-abstract class BaseCatalog implements StagingTableCatalog, ProcedureCatalog,
SupportsNamespaces {
+abstract class BaseCatalog implements StagingTableCatalog, ProcedureCatalog,
SupportsNamespaces, HasIcebergCatalog {
@Override
public Procedure loadProcedure(Identifier ident) throws
NoSuchProcedureException {
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index f84e1ae10..94146dbf2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.BoundPredicate;
@@ -50,6 +51,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
import org.apache.iceberg.spark.SparkTableUtil.SparkPartition;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.transforms.PartitionSpecVisitor;
import org.apache.iceberg.transforms.SortOrderVisitor;
@@ -628,6 +630,22 @@ public class Spark3Util {
return toIcebergTable(sparkTable);
}
+ /**
+ * Returns the underlying Iceberg Catalog object represented by a Spark
Catalog
+ * @param spark SparkSession used for looking up catalog reference
+ * @param catalogName The name of the Spark Catalog being referenced
+ * @return the Iceberg catalog class being wrapped by the Spark Catalog
+ */
+ public static Catalog loadIcebergCatalog(SparkSession spark, String
catalogName) {
+ CatalogPlugin catalogPlugin =
spark.sessionState().catalogManager().catalog(catalogName);
+ Preconditions.checkArgument(catalogPlugin instanceof HasIcebergCatalog,
+ String.format("Cannot load Iceberg catalog from catalog %s because it
does not contain an Iceberg Catalog. " +
+ "Actual Class: %s",
+ catalogName, catalogPlugin.getClass().getName()));
+ return ((HasIcebergCatalog) catalogPlugin).icebergCatalog();
+ }
+
+
public static CatalogAndIdentifier catalogAndIdentifier(SparkSession spark,
String name) throws ParseException {
return catalogAndIdentifier(spark, name,
spark.sessionState().catalogManager().currentCatalog());
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index b8e98171a..c9f7eef6d 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -621,4 +621,9 @@ public class SparkCatalog extends BaseCatalog {
tables.buildTable(((PathIdentifier) ident).location(), schema) :
icebergCatalog.buildTable(buildIdentifier(ident), schema);
}
+
+ @Override
+ public Catalog icebergCatalog() {
+ return icebergCatalog;
+ }
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 8245ef1a0..edc544762 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -20,7 +20,9 @@
package org.apache.iceberg.spark;
import java.util.Map;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -302,4 +304,11 @@ public class SparkSessionCatalog<T extends TableCatalog &
SupportsNamespaces>
"Please make sure your are replacing Spark's default catalog, named
'spark_catalog'.");
return sessionCatalog;
}
+
+ @Override
+ public Catalog icebergCatalog() {
+ Preconditions.checkArgument(icebergCatalog instanceof HasIcebergCatalog,
+ "Cannot return underlying Iceberg Catalog, wrapped catalog does not
contain an Iceberg Catalog");
+ return ((HasIcebergCatalog) icebergCatalog).icebergCatalog();
+ }
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
new file mode 100644
index 000000000..d7f1f56d9
--- /dev/null
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class RegisterTableProcedure extends BaseProcedure {
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.required("metadata_file", DataTypes.StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("Current SnapshotId", DataTypes.LongType, true,
Metadata.empty()),
+ new StructField("Rows", DataTypes.LongType, true, Metadata.empty()),
+ new StructField("Datafiles", DataTypes.LongType, true, Metadata.empty())
+ });
+
+ private RegisterTableProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<RegisterTableProcedure>() {
+ @Override
+ protected RegisterTableProcedure doBuild() {
+ return new RegisterTableProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ TableIdentifier tableName =
Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0),
"table"));
+ String metadataFile = args.getString(1);
+ Preconditions.checkArgument(tableCatalog() instanceof HasIcebergCatalog,
+ "Cannot use Register Table in a non-Iceberg catalog");
+ Preconditions.checkArgument(metadataFile != null &&
!metadataFile.isEmpty(),
+ "Cannot handle an empty argument metadata_file");
+
+ Catalog icebergCatalog = ((HasIcebergCatalog)
tableCatalog()).icebergCatalog();
+ Table table = icebergCatalog.registerTable(tableName, metadataFile);
+ Long currentSnapshotId = null;
+ Long totalDataFiles = null;
+ Long totalRecords = null;
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+ if (currentSnapshot != null) {
+ currentSnapshotId = currentSnapshot.snapshotId();
+ totalDataFiles =
Long.parseLong(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+ totalRecords =
Long.parseLong(currentSnapshot.summary().get(SnapshotSummary.TOTAL_RECORDS_PROP));
+ }
+
+ return new InternalRow[] {newInternalRow(currentSnapshotId, totalRecords,
totalDataFiles)};
+ }
+
+ @Override
+ public String description() {
+ return "RegisterTableProcedure";
+ }
+}
+
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 4ce9460b9..1e944fcf0 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -53,6 +53,7 @@ public class SparkProcedures {
mapBuilder.put("snapshot", SnapshotTableProcedure::builder);
mapBuilder.put("add_files", AddFilesProcedure::builder);
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
+ mapBuilder.put("register_table", RegisterTableProcedure::builder);
return mapBuilder.build();
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/HasIcebergCatalog.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/HasIcebergCatalog.java
new file mode 100644
index 000000000..e2579a005
--- /dev/null
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/HasIcebergCatalog.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+
+public interface HasIcebergCatalog extends TableCatalog {
+
+ /**
+ * Returns the underlying {@link org.apache.iceberg.catalog.Catalog} backing
this Spark Catalog
+ */
+ Catalog icebergCatalog();
+}
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
index 8d57fe112..be6130f63 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
@@ -19,10 +19,12 @@
package org.apache.iceberg.spark;
+import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
@@ -92,6 +94,14 @@ public class TestSpark3Util extends SparkTestBase {
Assert.assertTrue(table.name().equals(tableFullName));
}
+ @Test
+ public void testLoadIcebergCatalog() throws Exception {
+ spark.conf().set("spark.sql.catalog.test_cat",
SparkCatalog.class.getName());
+ spark.conf().set("spark.sql.catalog.test_cat.type", "hive");
+ Catalog catalog = Spark3Util.loadIcebergCatalog(spark, "test_cat");
+ Assert.assertTrue("Should retrieve underlying catalog class", catalog
instanceof CachingCatalog);
+ }
+
private SortOrder buildSortOrder(String transform, Schema schema, int
sourceId) {
String jsonString = "{\n" +
" \"order-id\" : 10,\n" +
diff --git a/version.txt b/version.txt
new file mode 100644
index 000000000..9beb74d49
--- /dev/null
+++ b/version.txt
@@ -0,0 +1 @@
+0.13.2