This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c0e4d313df [core] Support configurable Iceberg metadata path 
independent of storage type (#5471)
c0e4d313df is described below

commit c0e4d313df54ce891e4f3dc4b8a16d0f24b1c374
Author: jahangir-yelp <[email protected]>
AuthorDate: Tue Apr 15 15:35:33 2025 +0100

    [core] Support configurable Iceberg metadata path independent of storage 
type (#5471)
---
 docs/content/migration/iceberg-compatibility.md    |  16 +-
 .../generated/iceberg_configuration.html           |   6 +
 .../paimon/iceberg/IcebergCommitCallback.java      |  60 ++++--
 .../org/apache/paimon/iceberg/IcebergOptions.java  |  37 ++++
 .../paimon/iceberg/IcebergCommitCallbackTest.java  | 228 +++++++++++++++++++++
 5 files changed, 328 insertions(+), 19 deletions(-)

diff --git a/docs/content/migration/iceberg-compatibility.md 
b/docs/content/migration/iceberg-compatibility.md
index 66b6f17b2e..f624b6ff54 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -55,6 +55,18 @@ Set the following table options, so that Paimon tables can 
generate Iceberg comp
         </ul>
       </td>
     </tr>
+    <tr>
+      <td><h5>metadata.iceberg.storage-location</h5></td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Enum</td>
+      <td>
+        Specifies where to store Iceberg metadata files. If not set, the 
storage location will default based on the selected metadata.iceberg.storage 
type.
+        <ul>
+          <li><code>table-location</code>: Store Iceberg metadata in each 
table's directory. Useful for standalone Iceberg tables or Iceberg Java API 
access. Can also be used with Hive Catalog.</li>
+          <li><code>catalog-location</code>: Store Iceberg metadata in a 
separate directory. This is the default behavior when using Hive Catalog or 
Hadoop Catalog.</li>
+        </ul>
+      </td>
+    </tr>
     </tbody>
 </table>
 
@@ -63,7 +75,9 @@ or `'metadata.iceberg.storage' = 'hive-catalog'`,
 so that all tables can be visited as an Iceberg warehouse.
 For Iceberg Java API users, you might consider setting 
`'metadata.iceberg.storage' = 'table-location'`,
 so you can visit each table with its table path.
-
+When using `metadata.iceberg.storage = hadoop-catalog` or `hive-catalog`,
+you can optionally configure `metadata.iceberg.storage-location` to control 
where the metadata is stored. 
+If not set, the default behavior depends on the storage type.
 ## Append Tables
 
 Let's walk through a simple example, where we query Paimon tables with Iceberg 
connectors in Flink and Spark.
diff --git a/docs/layouts/shortcodes/generated/iceberg_configuration.html 
b/docs/layouts/shortcodes/generated/iceberg_configuration.html
index caa243dc42..7eef90574c 100644
--- a/docs/layouts/shortcodes/generated/iceberg_configuration.html
+++ b/docs/layouts/shortcodes/generated/iceberg_configuration.html
@@ -98,6 +98,12 @@ under the License.
             <td><p>Enum</p></td>
             <td>When set, produce Iceberg metadata after a snapshot is 
committed, so that Iceberg readers can read Paimon's raw data files.<br /><br 
/>Possible values:<ul><li>"disabled": Disable Iceberg compatibility 
support.</li><li>"table-location": Store Iceberg metadata in each table's 
directory.</li><li>"hadoop-catalog": Store Iceberg metadata in a separate 
directory. This directory can be specified as the warehouse directory of an 
Iceberg Hadoop catalog.</li><li>"hive-catalog": Not [...]
         </tr>
+        <tr>
+            <td><h5>metadata.iceberg.storage-location</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td><p>Enum</p></td>
+            <td>To store Iceberg metadata in a separate directory or under 
table location<br /><br />Possible values:<ul><li>"table-location": Store 
Iceberg metadata in each table's directory. Useful for standalone Iceberg 
tables or Java API access. Can also be used with Hive 
Catalog</li><li>"catalog-location": Store Iceberg metadata in a separate 
directory. Allows integration with Hive Catalog or Hadoop 
Catalog.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>metadata.iceberg.table</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index c20d07b9a4..9808d378f6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -108,18 +108,7 @@ public class IcebergCommitCallback implements 
CommitCallback {
 
         IcebergOptions.StorageType storageType =
                 
table.coreOptions().toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE);
-        switch (storageType) {
-            case TABLE_LOCATION:
-                this.pathFactory = new IcebergPathFactory(new 
Path(table.location(), "metadata"));
-                break;
-            case HADOOP_CATALOG:
-            case HIVE_CATALOG:
-                this.pathFactory = new 
IcebergPathFactory(catalogTableMetadataPath(table));
-                break;
-            default:
-                throw new UnsupportedOperationException(
-                        "Unknown storage type " + storageType.name());
-        }
+        this.pathFactory = new 
IcebergPathFactory(catalogTableMetadataPath(table));
 
         IcebergMetadataCommitterFactory metadataCommitterFactory;
         try {
@@ -147,13 +136,48 @@ public class IcebergCommitCallback implements 
CommitCallback {
     public static Path catalogDatabasePath(FileStoreTable table) {
         Path dbPath = table.location().getParent();
         final String dbSuffix = ".db";
-        if (dbPath.getName().endsWith(dbSuffix)) {
-            String dbName =
-                    dbPath.getName().substring(0, dbPath.getName().length() - 
dbSuffix.length());
-            return new Path(dbPath.getParent(), String.format("iceberg/%s/", 
dbName));
-        } else {
+
+        IcebergOptions.StorageType storageType =
+                
table.coreOptions().toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE);
+
+        if (!dbPath.getName().endsWith(dbSuffix)) {
             throw new UnsupportedOperationException(
-                    "Storage type ICEBERG_WAREHOUSE can only be used on Paimon 
tables in a Paimon warehouse.");
+                    String.format(
+                            "Storage type %s can only be used on Paimon tables 
in a Paimon warehouse.",
+                            storageType.name()));
+        }
+
+        IcebergOptions.StorageLocation storageLocation =
+                table.coreOptions()
+                        .toConfiguration()
+                        
.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION)
+                        .orElse(inferDefaultMetadataLocation(storageType));
+
+        switch (storageLocation) {
+            case TABLE_LOCATION:
+                return dbPath;
+            case CATALOG_STORAGE:
+                String dbName =
+                        dbPath.getName()
+                                .substring(0, dbPath.getName().length() - 
dbSuffix.length());
+                return new Path(dbPath.getParent(), 
String.format("iceberg/%s/", dbName));
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown storage location " + storageLocation.name());
+        }
+    }
+
+    private static IcebergOptions.StorageLocation inferDefaultMetadataLocation(
+            IcebergOptions.StorageType storageType) {
+        switch (storageType) {
+            case TABLE_LOCATION:
+                return IcebergOptions.StorageLocation.TABLE_LOCATION;
+            case HIVE_CATALOG:
+            case HADOOP_CATALOG:
+                return IcebergOptions.StorageLocation.CATALOG_STORAGE;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown storage type: " + storageType.name());
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index 6835440d84..e5166b9394 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -37,6 +37,13 @@ public class IcebergOptions {
                             "When set, produce Iceberg metadata after a 
snapshot is committed, "
                                     + "so that Iceberg readers can read 
Paimon's raw data files.");
 
+    public static final ConfigOption<StorageLocation> 
METADATA_ICEBERG_STORAGE_LOCATION =
+            key("metadata.iceberg.storage-location")
+                    .enumType(StorageLocation.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "To store Iceberg metadata in a separate directory 
or under table location");
+
     public static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
             ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
                     .intType()
@@ -157,4 +164,34 @@ public class IcebergOptions {
             return TextElement.text(description);
         }
     }
+
+    /** Where to store Iceberg metadata. */
+    public enum StorageLocation implements DescribedEnum {
+        TABLE_LOCATION(
+                "table-location",
+                "Store Iceberg metadata in each table's directory. Useful for 
standalone "
+                        + "Iceberg tables or Java API access. Can also be used 
with Hive Catalog"),
+        CATALOG_STORAGE(
+                "catalog-location",
+                "Store Iceberg metadata in a separate directory. "
+                        + "Allows integration with Hive Catalog or Hadoop 
Catalog.");
+
+        private final String value;
+        private final String description;
+
+        StorageLocation(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return TextElement.text(description);
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
new file mode 100644
index 0000000000..4a872c9b4e
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link IcebergCommitCallback}. */
+public class IcebergCommitCallbackTest {
+
+    private FileStoreTable mockTable;
+    private CoreOptions mockCoreOptions;
+    private Options mockConfig;
+
+    @BeforeEach
+    void setUp() {
+        mockTable = mock(FileStoreTable.class);
+        when(mockTable.location()).thenReturn(new 
Path("file:/tmp/paimon/mydb.db/mytable"));
+
+        mockCoreOptions = mock(CoreOptions.class);
+        when(mockTable.coreOptions()).thenReturn(mockCoreOptions);
+
+        mockConfig = mock(Options.class);
+        when(mockCoreOptions.toConfiguration()).thenReturn(mockConfig);
+    }
+
+    @ParameterizedTest(name = "StorageType: {0}")
+    @MethodSource("provideMetadataPathsWithStorageType")
+    void testCatalogTableMetadataPathWithStorageType(
+            IcebergOptions.StorageType storageType, String expectedPath) {
+        
when(mockConfig.get(IcebergOptions.METADATA_ICEBERG_STORAGE)).thenReturn(storageType);
+
+        Path result = 
IcebergCommitCallback.catalogTableMetadataPath(mockTable);
+
+        assertThat(result.toString()).isEqualTo(expectedPath);
+    }
+
+    @ParameterizedTest(name = "StorageType: {0}")
+    @MethodSource("provideDatabasePathsWithStorageType")
+    void testCatalogDatabasePathWithStorageType(
+            IcebergOptions.StorageType storageType, String expectedPath) {
+        
when(mockConfig.get(IcebergOptions.METADATA_ICEBERG_STORAGE)).thenReturn(storageType);
+
+        Path result = IcebergCommitCallback.catalogDatabasePath(mockTable);
+
+        assertThat(result.toString()).isEqualTo(expectedPath);
+    }
+
+    @ParameterizedTest(name = "StorageType: {0}")
+    @MethodSource("provideMetadataPathsWithStorageLocation")
+    void testCatalogMetadataPathWithStorageLocation(
+            IcebergOptions.StorageType storageType,
+            IcebergOptions.StorageLocation storageLocation,
+            String expectedPath) {
+        
when(mockConfig.get(IcebergOptions.METADATA_ICEBERG_STORAGE)).thenReturn(storageType);
+        if (storageLocation != null) {
+            
when(mockConfig.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION))
+                    .thenReturn(java.util.Optional.of(storageLocation));
+        } else {
+            
when(mockConfig.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION))
+                    .thenReturn(java.util.Optional.empty());
+        }
+
+        Path result = 
IcebergCommitCallback.catalogTableMetadataPath(mockTable);
+
+        assertThat(result.toString()).isEqualTo(expectedPath);
+    }
+
+    @ParameterizedTest(name = "StorageType: {0}")
+    @MethodSource("provideDatabasePathsWithStorageLocation")
+    void testCatalogDatabasePathWithStorageLocation(
+            IcebergOptions.StorageType storageType,
+            IcebergOptions.StorageLocation storageLocation,
+            String expectedPath) {
+        
when(mockConfig.get(IcebergOptions.METADATA_ICEBERG_STORAGE)).thenReturn(storageType);
+        if (storageLocation != null) {
+            
when(mockConfig.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION))
+                    .thenReturn(java.util.Optional.of(storageLocation));
+        } else {
+            
when(mockConfig.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION))
+                    .thenReturn(java.util.Optional.empty());
+        }
+
+        Path result = IcebergCommitCallback.catalogDatabasePath(mockTable);
+
+        assertThat(result.toString()).isEqualTo(expectedPath);
+    }
+
+    private static Stream<Arguments> provideMetadataPathsWithStorageType() {
+        return Stream.of(
+                Arguments.of(
+                        IcebergOptions.StorageType.TABLE_LOCATION,
+                        "file:/tmp/paimon/mydb.db/mytable/metadata"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HIVE_CATALOG,
+                        "file:/tmp/paimon/iceberg/mydb/mytable/metadata"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HADOOP_CATALOG,
+                        "file:/tmp/paimon/iceberg/mydb/mytable/metadata"));
+    }
+
+    private static Stream<Arguments> provideMetadataPathsWithStorageLocation() 
{
+        return Stream.of(
+                // Explicitly set StorageLocation
+                Arguments.of(
+                        IcebergOptions.StorageType.TABLE_LOCATION,
+                        IcebergOptions.StorageLocation.TABLE_LOCATION,
+                        "file:/tmp/paimon/mydb.db/mytable/metadata"),
+                Arguments.of(
+                        IcebergOptions.StorageType.TABLE_LOCATION,
+                        IcebergOptions.StorageLocation.CATALOG_STORAGE,
+                        "file:/tmp/paimon/iceberg/mydb/mytable/metadata"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HIVE_CATALOG,
+                        IcebergOptions.StorageLocation.TABLE_LOCATION,
+                        "file:/tmp/paimon/mydb.db/mytable/metadata"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HIVE_CATALOG,
+                        IcebergOptions.StorageLocation.CATALOG_STORAGE,
+                        "file:/tmp/paimon/iceberg/mydb/mytable/metadata"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HADOOP_CATALOG,
+                        IcebergOptions.StorageLocation.TABLE_LOCATION,
+                        "file:/tmp/paimon/mydb.db/mytable/metadata"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HADOOP_CATALOG,
+                        IcebergOptions.StorageLocation.CATALOG_STORAGE,
+                        "file:/tmp/paimon/iceberg/mydb/mytable/metadata"),
+
+                // Backward compatibility: StorageLocation is not provided.
+                Arguments.of(
+                        IcebergOptions.StorageType.TABLE_LOCATION,
+                        null,
+                        "file:/tmp/paimon/mydb.db/mytable/metadata"), // 
Defaults to TABLE_LOCATION
+                Arguments.of(
+                        IcebergOptions.StorageType.HIVE_CATALOG,
+                        null,
+                        "file:/tmp/paimon/iceberg/mydb/mytable/metadata"), // 
Defaults to
+                // CATALOG_STORAGE
+                Arguments.of(
+                        IcebergOptions.StorageType.HADOOP_CATALOG,
+                        null,
+                        "file:/tmp/paimon/iceberg/mydb/mytable/metadata") // 
Defaults to
+                // CATALOG_STORAGE
+                );
+    }
+
+    private static Stream<Arguments> provideDatabasePathsWithStorageType() {
+        return Stream.of(
+                Arguments.of(IcebergOptions.StorageType.TABLE_LOCATION, 
"file:/tmp/paimon/mydb.db"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HIVE_CATALOG, 
"file:/tmp/paimon/iceberg/mydb"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HADOOP_CATALOG,
+                        "file:/tmp/paimon/iceberg/mydb"));
+    }
+
+    private static Stream<Arguments> provideDatabasePathsWithStorageLocation() 
{
+        return Stream.of(
+                Arguments.of(
+                        IcebergOptions.StorageType.TABLE_LOCATION,
+                        IcebergOptions.StorageLocation.TABLE_LOCATION,
+                        "file:/tmp/paimon/mydb.db"),
+                Arguments.of(
+                        IcebergOptions.StorageType.TABLE_LOCATION,
+                        IcebergOptions.StorageLocation.CATALOG_STORAGE,
+                        "file:/tmp/paimon/iceberg/mydb"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HIVE_CATALOG,
+                        IcebergOptions.StorageLocation.TABLE_LOCATION,
+                        "file:/tmp/paimon/mydb.db"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HIVE_CATALOG,
+                        IcebergOptions.StorageLocation.CATALOG_STORAGE,
+                        "file:/tmp/paimon/iceberg/mydb"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HADOOP_CATALOG,
+                        IcebergOptions.StorageLocation.TABLE_LOCATION,
+                        "file:/tmp/paimon/mydb.db"),
+                Arguments.of(
+                        IcebergOptions.StorageType.HADOOP_CATALOG,
+                        IcebergOptions.StorageLocation.CATALOG_STORAGE,
+                        "file:/tmp/paimon/iceberg/mydb"),
+                // Backward compatibility: StorageLocation is not provided.
+                Arguments.of(
+                        IcebergOptions.StorageType.TABLE_LOCATION,
+                        null,
+                        "file:/tmp/paimon/mydb.db"), // Defaults to 
TABLE_LOCATION
+                Arguments.of(
+                        IcebergOptions.StorageType.HIVE_CATALOG,
+                        null,
+                        "file:/tmp/paimon/iceberg/mydb"), // Defaults to 
CATALOG_STORAGE
+                Arguments.of(
+                        IcebergOptions.StorageType.HADOOP_CATALOG,
+                        null,
+                        "file:/tmp/paimon/iceberg/mydb") // Defaults to 
CATALOG_STORAGE
+                );
+    }
+}

Reply via email to