This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 419b02a836 [iceberg] Introduce integration for AWS Glue (#4624)
419b02a836 is described below
commit 419b02a836da34e2050a1d6c56a57e3ea32d7e99
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 2 21:09:48 2024 +0800
[iceberg] Introduce integration for AWS Glue (#4624)
---
docs/content/migration/iceberg-compatibility.md | 11 +++++++
.../iceberg/AbstractIcebergCommitCallback.java | 35 +++++++++++----------
.../org/apache/paimon/iceberg/IcebergOptions.java | 6 ++++
.../iceberg/IcebergHiveMetadataCommitter.java | 8 ++---
.../IcebergHive23MetadataCommitterITCase.java | 9 +++++-
.../IcebergHive31MetadataCommitterITCase.java | 9 +++++-
.../IcebergHiveMetadataCommitterITCaseBase.java | 36 ++++++++++++++++++++++
7 files changed, 92 insertions(+), 22 deletions(-)
diff --git a/docs/content/migration/iceberg-compatibility.md
b/docs/content/migration/iceberg-compatibility.md
index d745607148..01a03a4526 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -383,9 +383,20 @@ you also need to set some (or all) of the following table
options when creating
<td>Boolean</td>
<td>Should use the legacy manifest version to generate Iceberg's 1.4
manifest files.</td>
</tr>
+ <tr>
+ <td><h5>metadata.iceberg.hive-client-class</h5></td>
+ <td style="word-wrap:
break-word;">org.apache.hadoop.hive.metastore.HiveMetaStoreClient</td>
+ <td>String</td>
+ <td>Hive client class name for Iceberg Hive Catalog.</td>
+ </tr>
</tbody>
</table>
+## AWS Glue Catalog
+
+You can use Hive Catalog to connect AWS Glue metastore, you can use set
`'metadata.iceberg.hive-client-class'` to
+`'com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient'`.
+
## AWS Athena
AWS Athena may use old manifest reader to read Iceberg manifest by names, we
should let Paimon producing legacy Iceberg
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
index 1b952c1716..7ea6cbe057 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
@@ -112,22 +112,7 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
break;
case HADOOP_CATALOG:
case HIVE_CATALOG:
- Path dbPath = table.location().getParent();
- final String dbSuffix = ".db";
- if (dbPath.getName().endsWith(dbSuffix)) {
- String dbName =
- dbPath.getName()
- .substring(0, dbPath.getName().length() -
dbSuffix.length());
- String tableName = table.location().getName();
- Path separatePath =
- new Path(
- dbPath.getParent(),
- String.format("iceberg/%s/%s/metadata",
dbName, tableName));
- this.pathFactory = new IcebergPathFactory(separatePath);
- } else {
- throw new UnsupportedOperationException(
- "Storage type ICEBERG_WAREHOUSE can only be used
on Paimon tables in a Paimon warehouse.");
- }
+ this.pathFactory = new
IcebergPathFactory(catalogTableMetadataPath(table));
break;
default:
throw new UnsupportedOperationException(
@@ -152,6 +137,24 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
this.manifestList = IcebergManifestList.create(table, pathFactory);
}
+ public static Path catalogTableMetadataPath(FileStoreTable table) {
+ Path icebergDBPath = catalogDatabasePath(table);
+ return new Path(icebergDBPath, String.format("%s/metadata",
table.location().getName()));
+ }
+
+ public static Path catalogDatabasePath(FileStoreTable table) {
+ Path dbPath = table.location().getParent();
+ final String dbSuffix = ".db";
+ if (dbPath.getName().endsWith(dbSuffix)) {
+ String dbName =
+ dbPath.getName().substring(0, dbPath.getName().length() -
dbSuffix.length());
+ return new Path(dbPath.getParent(), String.format("iceberg/%s/",
dbName));
+ } else {
+ throw new UnsupportedOperationException(
+ "Storage type ICEBERG_WAREHOUSE can only be used on Paimon
tables in a Paimon warehouse.");
+ }
+ }
+
@Override
public void close() throws Exception {}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index c0ceed97ba..4b59e29c8c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -84,6 +84,12 @@ public class IcebergOptions {
.withDescription(
"Should use the legacy manifest version to
generate Iceberg's 1.4 manifest files.");
+ public static final ConfigOption<String> HIVE_CLIENT_CLASS =
+ key("metadata.iceberg.hive-client-class")
+ .stringType()
+
.defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+ .withDescription("Hive client class name for Iceberg Hive
Catalog.");
+
/** Where to store Iceberg metadata. */
public enum StorageType implements DescribedEnum {
DISABLED("disabled", "Disable Iceberg compatibility support."),
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
index d913f729e3..ddd21384cb 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hive.HiveCatalogFactory;
import org.apache.paimon.hive.HiveTypeUtils;
import org.apache.paimon.hive.pool.CachedClientPool;
import org.apache.paimon.options.Options;
@@ -49,6 +48,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.stream.Collectors;
+import static
org.apache.paimon.iceberg.AbstractIcebergCommitCallback.catalogDatabasePath;
+
/**
* {@link IcebergMetadataCommitter} to commit Iceberg metadata to Hive
metastore, so the table can
* be visited by Iceberg's Hive catalog.
@@ -98,9 +99,7 @@ public class IcebergHiveMetadataCommitter implements
IcebergMetadataCommitter {
this.clients =
new CachedClientPool(
- hiveConf,
- options,
-
HiveCatalogFactory.METASTORE_CLIENT_CLASS.defaultValue());
+ hiveConf, options,
options.getString(IcebergOptions.HIVE_CLIENT_CLASS));
}
@Override
@@ -158,6 +157,7 @@ public class IcebergHiveMetadataCommitter implements
IcebergMetadataCommitter {
private void createDatabase(String databaseName) throws Exception {
Database database = new Database();
database.setName(databaseName);
+ database.setLocationUri(catalogDatabasePath(table).toString());
clients.execute(client -> client.createDatabase(database));
}
diff --git
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java
index a9e4ba9454..7d726e75a1 100644
---
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java
+++
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java
@@ -18,5 +18,12 @@
package org.apache.paimon.iceberg;
+import org.apache.paimon.hive.CreateFailHiveMetaStoreClient;
+
/** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 2.3. */
-public class IcebergHive23MetadataCommitterITCase extends
IcebergHiveMetadataCommitterITCaseBase {}
+public class IcebergHive23MetadataCommitterITCase extends
IcebergHiveMetadataCommitterITCaseBase {
+ @Override
+ protected String createFailHiveMetaStoreClient() {
+ return CreateFailHiveMetaStoreClient.class.getName();
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java
index 6f4b0afd1a..0634adfad3 100644
---
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java
+++
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java
@@ -18,5 +18,12 @@
package org.apache.paimon.iceberg;
+import org.apache.paimon.hive.CreateFailHiveMetaStoreClient;
+
/** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 3.1. */
-public class IcebergHive31MetadataCommitterITCase extends
IcebergHiveMetadataCommitterITCaseBase {}
+public class IcebergHive31MetadataCommitterITCase extends
IcebergHiveMetadataCommitterITCaseBase {
+ @Override
+ protected String createFailHiveMetaStoreClient() {
+ return CreateFailHiveMetaStoreClient.class.getName();
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
index fab2277575..d0c64c5d3b 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
@@ -104,6 +104,12 @@ public abstract class
IcebergHiveMetadataCommitterITCaseBase {
Row.of(2, 1, "cat"),
Row.of(2, 2, "elephant")),
collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t
ORDER BY pt, id")));
+
+ Assert.assertTrue(
+ hiveShell
+ .executeQuery("DESC DATABASE EXTENDED test_db")
+ .toString()
+ .contains("iceberg/test_db"));
}
@Test
@@ -150,6 +156,36 @@ public abstract class
IcebergHiveMetadataCommitterITCaseBase {
"SELECT data, id, pt FROM my_iceberg.test_db.t
WHERE id > 1 ORDER BY pt, id")));
}
+ @Test
+ public void testCustomMetastoreClass() {
+ TableEnvironment tEnv =
+ TableEnvironmentImpl.create(
+
EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv.executeSql(
+ "CREATE CATALOG my_paimon WITH ( 'type' = 'paimon',
'warehouse' = '"
+ + path
+ + "' )");
+ tEnv.executeSql("CREATE DATABASE my_paimon.test_db");
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE my_paimon.test_db.t ( pt INT, id INT,
data STRING ) PARTITIONED BY (pt) WITH "
+ + "( "
+ + "'metadata.iceberg.storage' =
'hive-catalog', "
+ + "'metadata.iceberg.uri' = '', "
+ + "'file.format' = 'avro', "
+ + "'metadata.iceberg.hive-client-class' =
'%s')",
+ createFailHiveMetaStoreClient()));
+ Assert.assertThrows(
+ Exception.class,
+ () ->
+ tEnv.executeSql(
+ "INSERT INTO my_paimon.test_db.t
VALUES "
+ + "(1, 1, 'apple'), (1, 2,
'pear'), (2, 1, 'cat'), (2, 2, 'dog')")
+ .await());
+ }
+
+ protected abstract String createFailHiveMetaStoreClient();
+
private List<Row> collect(TableResult result) throws Exception {
List<Row> rows = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {