This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 419b02a836 [iceberg] Introduce integration for AWS Glue (#4624)
419b02a836 is described below

commit 419b02a836da34e2050a1d6c56a57e3ea32d7e99
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 2 21:09:48 2024 +0800

    [iceberg] Introduce integration for AWS Glue (#4624)
---
 docs/content/migration/iceberg-compatibility.md    | 11 +++++++
 .../iceberg/AbstractIcebergCommitCallback.java     | 35 +++++++++++----------
 .../org/apache/paimon/iceberg/IcebergOptions.java  |  6 ++++
 .../iceberg/IcebergHiveMetadataCommitter.java      |  8 ++---
 .../IcebergHive23MetadataCommitterITCase.java      |  9 +++++-
 .../IcebergHive31MetadataCommitterITCase.java      |  9 +++++-
 .../IcebergHiveMetadataCommitterITCaseBase.java    | 36 ++++++++++++++++++++++
 7 files changed, 92 insertions(+), 22 deletions(-)

diff --git a/docs/content/migration/iceberg-compatibility.md 
b/docs/content/migration/iceberg-compatibility.md
index d745607148..01a03a4526 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -383,9 +383,20 @@ you also need to set some (or all) of the following table 
options when creating
       <td>Boolean</td>
       <td>Should use the legacy manifest version to generate Iceberg's 1.4 
manifest files.</td>
     </tr>
+    <tr>
+      <td><h5>metadata.iceberg.hive-client-class</h5></td>
+      <td style="word-wrap: 
break-word;">org.apache.hadoop.hive.metastore.HiveMetaStoreClient</td>
+      <td>String</td>
+      <td>Hive client class name for Iceberg Hive Catalog.</td>
+    </tr>
     </tbody>
 </table>
 
+## AWS Glue Catalog
+
+You can use Hive Catalog to connect AWS Glue metastore, you can use set 
`'metadata.iceberg.hive-client-class'` to
+`'com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient'`.
+
 ## AWS Athena
 
 AWS Athena may use old manifest reader to read Iceberg manifest by names, we 
should let Paimon producing legacy Iceberg
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
index 1b952c1716..7ea6cbe057 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
@@ -112,22 +112,7 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
                 break;
             case HADOOP_CATALOG:
             case HIVE_CATALOG:
-                Path dbPath = table.location().getParent();
-                final String dbSuffix = ".db";
-                if (dbPath.getName().endsWith(dbSuffix)) {
-                    String dbName =
-                            dbPath.getName()
-                                    .substring(0, dbPath.getName().length() - 
dbSuffix.length());
-                    String tableName = table.location().getName();
-                    Path separatePath =
-                            new Path(
-                                    dbPath.getParent(),
-                                    String.format("iceberg/%s/%s/metadata", 
dbName, tableName));
-                    this.pathFactory = new IcebergPathFactory(separatePath);
-                } else {
-                    throw new UnsupportedOperationException(
-                            "Storage type ICEBERG_WAREHOUSE can only be used 
on Paimon tables in a Paimon warehouse.");
-                }
+                this.pathFactory = new 
IcebergPathFactory(catalogTableMetadataPath(table));
                 break;
             default:
                 throw new UnsupportedOperationException(
@@ -152,6 +137,24 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
         this.manifestList = IcebergManifestList.create(table, pathFactory);
     }
 
+    public static Path catalogTableMetadataPath(FileStoreTable table) {
+        Path icebergDBPath = catalogDatabasePath(table);
+        return new Path(icebergDBPath, String.format("%s/metadata", 
table.location().getName()));
+    }
+
+    public static Path catalogDatabasePath(FileStoreTable table) {
+        Path dbPath = table.location().getParent();
+        final String dbSuffix = ".db";
+        if (dbPath.getName().endsWith(dbSuffix)) {
+            String dbName =
+                    dbPath.getName().substring(0, dbPath.getName().length() - 
dbSuffix.length());
+            return new Path(dbPath.getParent(), String.format("iceberg/%s/", 
dbName));
+        } else {
+            throw new UnsupportedOperationException(
+                    "Storage type ICEBERG_WAREHOUSE can only be used on Paimon 
tables in a Paimon warehouse.");
+        }
+    }
+
     @Override
     public void close() throws Exception {}
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index c0ceed97ba..4b59e29c8c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -84,6 +84,12 @@ public class IcebergOptions {
                     .withDescription(
                             "Should use the legacy manifest version to 
generate Iceberg's 1.4 manifest files.");
 
+    public static final ConfigOption<String> HIVE_CLIENT_CLASS =
+            key("metadata.iceberg.hive-client-class")
+                    .stringType()
+                    
.defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+                    .withDescription("Hive client class name for Iceberg Hive 
Catalog.");
+
     /** Where to store Iceberg metadata. */
     public enum StorageType implements DescribedEnum {
         DISABLED("disabled", "Disable Iceberg compatibility support."),
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
index d913f729e3..ddd21384cb 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.client.ClientPool;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hive.HiveCatalogFactory;
 import org.apache.paimon.hive.HiveTypeUtils;
 import org.apache.paimon.hive.pool.CachedClientPool;
 import org.apache.paimon.options.Options;
@@ -49,6 +48,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.iceberg.AbstractIcebergCommitCallback.catalogDatabasePath;
+
 /**
  * {@link IcebergMetadataCommitter} to commit Iceberg metadata to Hive 
metastore, so the table can
  * be visited by Iceberg's Hive catalog.
@@ -98,9 +99,7 @@ public class IcebergHiveMetadataCommitter implements 
IcebergMetadataCommitter {
 
         this.clients =
                 new CachedClientPool(
-                        hiveConf,
-                        options,
-                        
HiveCatalogFactory.METASTORE_CLIENT_CLASS.defaultValue());
+                        hiveConf, options, 
options.getString(IcebergOptions.HIVE_CLIENT_CLASS));
     }
 
     @Override
@@ -158,6 +157,7 @@ public class IcebergHiveMetadataCommitter implements 
IcebergMetadataCommitter {
     private void createDatabase(String databaseName) throws Exception {
         Database database = new Database();
         database.setName(databaseName);
+        database.setLocationUri(catalogDatabasePath(table).toString());
         clients.execute(client -> client.createDatabase(database));
     }
 
diff --git 
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java
 
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java
index a9e4ba9454..7d726e75a1 100644
--- 
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java
@@ -18,5 +18,12 @@
 
 package org.apache.paimon.iceberg;
 
+import org.apache.paimon.hive.CreateFailHiveMetaStoreClient;
+
 /** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 2.3. */
-public class IcebergHive23MetadataCommitterITCase extends 
IcebergHiveMetadataCommitterITCaseBase {}
+public class IcebergHive23MetadataCommitterITCase extends 
IcebergHiveMetadataCommitterITCaseBase {
+    @Override
+    protected String createFailHiveMetaStoreClient() {
+        return CreateFailHiveMetaStoreClient.class.getName();
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java
 
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java
index 6f4b0afd1a..0634adfad3 100644
--- 
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java
@@ -18,5 +18,12 @@
 
 package org.apache.paimon.iceberg;
 
+import org.apache.paimon.hive.CreateFailHiveMetaStoreClient;
+
 /** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 3.1. */
-public class IcebergHive31MetadataCommitterITCase extends 
IcebergHiveMetadataCommitterITCaseBase {}
+public class IcebergHive31MetadataCommitterITCase extends 
IcebergHiveMetadataCommitterITCaseBase {
+    @Override
+    protected String createFailHiveMetaStoreClient() {
+        return CreateFailHiveMetaStoreClient.class.getName();
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
index fab2277575..d0c64c5d3b 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
@@ -104,6 +104,12 @@ public abstract class 
IcebergHiveMetadataCommitterITCaseBase {
                         Row.of(2, 1, "cat"),
                         Row.of(2, 2, "elephant")),
                 collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t 
ORDER BY pt, id")));
+
+        Assert.assertTrue(
+                hiveShell
+                        .executeQuery("DESC DATABASE EXTENDED test_db")
+                        .toString()
+                        .contains("iceberg/test_db"));
     }
 
     @Test
@@ -150,6 +156,36 @@ public abstract class 
IcebergHiveMetadataCommitterITCaseBase {
                                 "SELECT data, id, pt FROM my_iceberg.test_db.t 
WHERE id > 1 ORDER BY pt, id")));
     }
 
+    @Test
+    public void testCustomMetastoreClass() {
+        TableEnvironment tEnv =
+                TableEnvironmentImpl.create(
+                        
EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv.executeSql(
+                "CREATE CATALOG my_paimon WITH ( 'type' = 'paimon', 
'warehouse' = '"
+                        + path
+                        + "' )");
+        tEnv.executeSql("CREATE DATABASE my_paimon.test_db");
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE my_paimon.test_db.t ( pt INT, id INT, 
data STRING ) PARTITIONED BY (pt) WITH "
+                                + "( "
+                                + "'metadata.iceberg.storage' = 
'hive-catalog', "
+                                + "'metadata.iceberg.uri' = '', "
+                                + "'file.format' = 'avro', "
+                                + "'metadata.iceberg.hive-client-class' = 
'%s')",
+                        createFailHiveMetaStoreClient()));
+        Assert.assertThrows(
+                Exception.class,
+                () ->
+                        tEnv.executeSql(
+                                        "INSERT INTO my_paimon.test_db.t 
VALUES "
+                                                + "(1, 1, 'apple'), (1, 2, 
'pear'), (2, 1, 'cat'), (2, 2, 'dog')")
+                                .await());
+    }
+
+    protected abstract String createFailHiveMetaStoreClient();
+
     private List<Row> collect(TableResult result) throws Exception {
         List<Row> rows = new ArrayList<>();
         try (CloseableIterator<Row> it = result.collect()) {

Reply via email to