This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c0e4d313df [core] Support configurable Iceberg metadata path
independent of storage type (#5471)
c0e4d313df is described below
commit c0e4d313df54ce891e4f3dc4b8a16d0f24b1c374
Author: jahangir-yelp <[email protected]>
AuthorDate: Tue Apr 15 15:35:33 2025 +0100
[core] Support configurable Iceberg metadata path independent of storage
type (#5471)
---
docs/content/migration/iceberg-compatibility.md | 16 +-
.../generated/iceberg_configuration.html | 6 +
.../paimon/iceberg/IcebergCommitCallback.java | 60 ++++--
.../org/apache/paimon/iceberg/IcebergOptions.java | 37 ++++
.../paimon/iceberg/IcebergCommitCallbackTest.java | 228 +++++++++++++++++++++
5 files changed, 328 insertions(+), 19 deletions(-)
diff --git a/docs/content/migration/iceberg-compatibility.md
b/docs/content/migration/iceberg-compatibility.md
index 66b6f17b2e..f624b6ff54 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -55,6 +55,18 @@ Set the following table options, so that Paimon tables can
generate Iceberg comp
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>metadata.iceberg.storage-location</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Enum</td>
+ <td>
+ Specifies where to store Iceberg metadata files. If not set, the
storage location will default based on the selected metadata.iceberg.storage
type.
+ <ul>
+ <li><code>table-location</code>: Store Iceberg metadata in each
table's directory. Useful for standalone Iceberg tables or Iceberg Java API
access. Can also be used with Hive Catalog.</li>
+ <li><code>catalog-location</code>: Store Iceberg metadata in a
separate directory. This is the default behavior when using Hive Catalog or
Hadoop Catalog.</li>
+ </ul>
+ </td>
+ </tr>
</tbody>
</table>
@@ -63,7 +75,9 @@ or `'metadata.iceberg.storage' = 'hive-catalog'`,
so that all tables can be visited as an Iceberg warehouse.
For Iceberg Java API users, you might consider setting
`'metadata.iceberg.storage' = 'table-location'`,
so you can visit each table with its table path.
-
+When using `metadata.iceberg.storage = hadoop-catalog` or `hive-catalog`,
+you can optionally configure `metadata.iceberg.storage-location` to control
where the metadata is stored.
+If not set, the default behavior depends on the storage type.
## Append Tables
Let's walk through a simple example, where we query Paimon tables with Iceberg
connectors in Flink and Spark.
diff --git a/docs/layouts/shortcodes/generated/iceberg_configuration.html
b/docs/layouts/shortcodes/generated/iceberg_configuration.html
index caa243dc42..7eef90574c 100644
--- a/docs/layouts/shortcodes/generated/iceberg_configuration.html
+++ b/docs/layouts/shortcodes/generated/iceberg_configuration.html
@@ -98,6 +98,12 @@ under the License.
<td><p>Enum</p></td>
<td>When set, produce Iceberg metadata after a snapshot is
committed, so that Iceberg readers can read Paimon's raw data files.<br /><br
/>Possible values:<ul><li>"disabled": Disable Iceberg compatibility
support.</li><li>"table-location": Store Iceberg metadata in each table's
directory.</li><li>"hadoop-catalog": Store Iceberg metadata in a separate
directory. This directory can be specified as the warehouse directory of an
Iceberg Hadoop catalog.</li><li>"hive-catalog": Not [...]
</tr>
+ <tr>
+ <td><h5>metadata.iceberg.storage-location</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td><p>Enum</p></td>
+ <td>To store Iceberg metadata in a separate directory or under
table location<br /><br />Possible values:<ul><li>"table-location": Store
Iceberg metadata in each table's directory. Useful for standalone Iceberg
tables or Java API access. Can also be used with Hive
Catalog</li><li>"catalog-location": Store Iceberg metadata in a separate
directory. Allows integration with Hive Catalog or Hadoop
Catalog.</li></ul></td>
+ </tr>
<tr>
<td><h5>metadata.iceberg.table</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index c20d07b9a4..9808d378f6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -108,18 +108,7 @@ public class IcebergCommitCallback implements
CommitCallback {
IcebergOptions.StorageType storageType =
table.coreOptions().toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE);
- switch (storageType) {
- case TABLE_LOCATION:
- this.pathFactory = new IcebergPathFactory(new
Path(table.location(), "metadata"));
- break;
- case HADOOP_CATALOG:
- case HIVE_CATALOG:
- this.pathFactory = new
IcebergPathFactory(catalogTableMetadataPath(table));
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown storage type " + storageType.name());
- }
+ this.pathFactory = new
IcebergPathFactory(catalogTableMetadataPath(table));
IcebergMetadataCommitterFactory metadataCommitterFactory;
try {
@@ -147,13 +136,48 @@ public class IcebergCommitCallback implements
CommitCallback {
public static Path catalogDatabasePath(FileStoreTable table) {
Path dbPath = table.location().getParent();
final String dbSuffix = ".db";
- if (dbPath.getName().endsWith(dbSuffix)) {
- String dbName =
- dbPath.getName().substring(0, dbPath.getName().length() -
dbSuffix.length());
- return new Path(dbPath.getParent(), String.format("iceberg/%s/",
dbName));
- } else {
+
+ IcebergOptions.StorageType storageType =
+
table.coreOptions().toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE);
+
+ if (!dbPath.getName().endsWith(dbSuffix)) {
throw new UnsupportedOperationException(
- "Storage type ICEBERG_WAREHOUSE can only be used on Paimon
tables in a Paimon warehouse.");
+ String.format(
+ "Storage type %s can only be used on Paimon tables
in a Paimon warehouse.",
+ storageType.name()));
+ }
+
+ IcebergOptions.StorageLocation storageLocation =
+ table.coreOptions()
+ .toConfiguration()
+
.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION)
+ .orElse(inferDefaultMetadataLocation(storageType));
+
+ switch (storageLocation) {
+ case TABLE_LOCATION:
+ return dbPath;
+ case CATALOG_STORAGE:
+ String dbName =
+ dbPath.getName()
+ .substring(0, dbPath.getName().length() -
dbSuffix.length());
+ return new Path(dbPath.getParent(),
String.format("iceberg/%s/", dbName));
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown storage location " + storageLocation.name());
+ }
+ }
+
+ private static IcebergOptions.StorageLocation inferDefaultMetadataLocation(
+ IcebergOptions.StorageType storageType) {
+ switch (storageType) {
+ case TABLE_LOCATION:
+ return IcebergOptions.StorageLocation.TABLE_LOCATION;
+ case HIVE_CATALOG:
+ case HADOOP_CATALOG:
+ return IcebergOptions.StorageLocation.CATALOG_STORAGE;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown storage type: " + storageType.name());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index 6835440d84..e5166b9394 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -37,6 +37,13 @@ public class IcebergOptions {
"When set, produce Iceberg metadata after a
snapshot is committed, "
+ "so that Iceberg readers can read
Paimon's raw data files.");
+ public static final ConfigOption<StorageLocation>
METADATA_ICEBERG_STORAGE_LOCATION =
+ key("metadata.iceberg.storage-location")
+ .enumType(StorageLocation.class)
+ .noDefaultValue()
+ .withDescription(
+ "To store Iceberg metadata in a separate directory
or under table location");
+
public static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
.intType()
@@ -157,4 +164,34 @@ public class IcebergOptions {
return TextElement.text(description);
}
}
+
+ /** Where to store Iceberg metadata. */
+ public enum StorageLocation implements DescribedEnum {
+ TABLE_LOCATION(
+ "table-location",
+ "Store Iceberg metadata in each table's directory. Useful for
standalone "
+ + "Iceberg tables or Java API access. Can also be used
with Hive Catalog"),
+ CATALOG_STORAGE(
+ "catalog-location",
+ "Store Iceberg metadata in a separate directory. "
+ + "Allows integration with Hive Catalog or Hadoop
Catalog.");
+
+ private final String value;
+ private final String description;
+
+ StorageLocation(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return TextElement.text(description);
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
new file mode 100644
index 0000000000..4a872c9b4e
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link IcebergCommitCallback}. */
+public class IcebergCommitCallbackTest {
+
+ private FileStoreTable mockTable;
+ private CoreOptions mockCoreOptions;
+ private Options mockConfig;
+
+ @BeforeEach
+ void setUp() {
+ mockTable = mock(FileStoreTable.class);
+ when(mockTable.location()).thenReturn(new
Path("file:/tmp/paimon/mydb.db/mytable"));
+
+ mockCoreOptions = mock(CoreOptions.class);
+ when(mockTable.coreOptions()).thenReturn(mockCoreOptions);
+
+ mockConfig = mock(Options.class);
+ when(mockCoreOptions.toConfiguration()).thenReturn(mockConfig);
+ }
+
+ @ParameterizedTest(name = "StorageType: {0}")
+ @MethodSource("provideMetadataPathsWithStorageType")
+ void testCatalogTableMetadataPathWithStorageType(
+ IcebergOptions.StorageType storageType, String expectedPath) {
+
when(mockConfig.get(IcebergOptions.METADATA_ICEBERG_STORAGE)).thenReturn(storageType);
+
+ Path result =
IcebergCommitCallback.catalogTableMetadataPath(mockTable);
+
+ assertThat(result.toString()).isEqualTo(expectedPath);
+ }
+
+ @ParameterizedTest(name = "StorageType: {0}")
+ @MethodSource("provideDatabasePathsWithStorageType")
+ void testCatalogDatabasePathWithStorageType(
+ IcebergOptions.StorageType storageType, String expectedPath) {
+
when(mockConfig.get(IcebergOptions.METADATA_ICEBERG_STORAGE)).thenReturn(storageType);
+
+ Path result = IcebergCommitCallback.catalogDatabasePath(mockTable);
+
+ assertThat(result.toString()).isEqualTo(expectedPath);
+ }
+
+ @ParameterizedTest(name = "StorageType: {0}")
+ @MethodSource("provideMetadataPathsWithStorageLocation")
+ void testCatalogMetadataPathWithStorageLocation(
+ IcebergOptions.StorageType storageType,
+ IcebergOptions.StorageLocation storageLocation,
+ String expectedPath) {
+
when(mockConfig.get(IcebergOptions.METADATA_ICEBERG_STORAGE)).thenReturn(storageType);
+ if (storageLocation != null) {
+
when(mockConfig.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION))
+ .thenReturn(java.util.Optional.of(storageLocation));
+ } else {
+
when(mockConfig.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION))
+ .thenReturn(java.util.Optional.empty());
+ }
+
+ Path result =
IcebergCommitCallback.catalogTableMetadataPath(mockTable);
+
+ assertThat(result.toString()).isEqualTo(expectedPath);
+ }
+
+ @ParameterizedTest(name = "StorageType: {0}")
+ @MethodSource("provideDatabasePathsWithStorageLocation")
+ void testCatalogDatabasePathWithStorageLocation(
+ IcebergOptions.StorageType storageType,
+ IcebergOptions.StorageLocation storageLocation,
+ String expectedPath) {
+
when(mockConfig.get(IcebergOptions.METADATA_ICEBERG_STORAGE)).thenReturn(storageType);
+ if (storageLocation != null) {
+
when(mockConfig.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION))
+ .thenReturn(java.util.Optional.of(storageLocation));
+ } else {
+
when(mockConfig.getOptional(IcebergOptions.METADATA_ICEBERG_STORAGE_LOCATION))
+ .thenReturn(java.util.Optional.empty());
+ }
+
+ Path result = IcebergCommitCallback.catalogDatabasePath(mockTable);
+
+ assertThat(result.toString()).isEqualTo(expectedPath);
+ }
+
+ private static Stream<Arguments> provideMetadataPathsWithStorageType() {
+ return Stream.of(
+ Arguments.of(
+ IcebergOptions.StorageType.TABLE_LOCATION,
+ "file:/tmp/paimon/mydb.db/mytable/metadata"),
+ Arguments.of(
+ IcebergOptions.StorageType.HIVE_CATALOG,
+ "file:/tmp/paimon/iceberg/mydb/mytable/metadata"),
+ Arguments.of(
+ IcebergOptions.StorageType.HADOOP_CATALOG,
+ "file:/tmp/paimon/iceberg/mydb/mytable/metadata"));
+ }
+
+ private static Stream<Arguments> provideMetadataPathsWithStorageLocation()
{
+ return Stream.of(
+ // Explicitly set StorageLocation
+ Arguments.of(
+ IcebergOptions.StorageType.TABLE_LOCATION,
+ IcebergOptions.StorageLocation.TABLE_LOCATION,
+ "file:/tmp/paimon/mydb.db/mytable/metadata"),
+ Arguments.of(
+ IcebergOptions.StorageType.TABLE_LOCATION,
+ IcebergOptions.StorageLocation.CATALOG_STORAGE,
+ "file:/tmp/paimon/iceberg/mydb/mytable/metadata"),
+ Arguments.of(
+ IcebergOptions.StorageType.HIVE_CATALOG,
+ IcebergOptions.StorageLocation.TABLE_LOCATION,
+ "file:/tmp/paimon/mydb.db/mytable/metadata"),
+ Arguments.of(
+ IcebergOptions.StorageType.HIVE_CATALOG,
+ IcebergOptions.StorageLocation.CATALOG_STORAGE,
+ "file:/tmp/paimon/iceberg/mydb/mytable/metadata"),
+ Arguments.of(
+ IcebergOptions.StorageType.HADOOP_CATALOG,
+ IcebergOptions.StorageLocation.TABLE_LOCATION,
+ "file:/tmp/paimon/mydb.db/mytable/metadata"),
+ Arguments.of(
+ IcebergOptions.StorageType.HADOOP_CATALOG,
+ IcebergOptions.StorageLocation.CATALOG_STORAGE,
+ "file:/tmp/paimon/iceberg/mydb/mytable/metadata"),
+
+ // Backward compatibility: StorageLocation is not provided.
+ Arguments.of(
+ IcebergOptions.StorageType.TABLE_LOCATION,
+ null,
+ "file:/tmp/paimon/mydb.db/mytable/metadata"), //
Defaults to TABLE_LOCATION
+ Arguments.of(
+ IcebergOptions.StorageType.HIVE_CATALOG,
+ null,
+ "file:/tmp/paimon/iceberg/mydb/mytable/metadata"), //
Defaults to
+ // CATALOG_STORAGE
+ Arguments.of(
+ IcebergOptions.StorageType.HADOOP_CATALOG,
+ null,
+ "file:/tmp/paimon/iceberg/mydb/mytable/metadata") //
Defaults to
+ // CATALOG_STORAGE
+ );
+ }
+
+ private static Stream<Arguments> provideDatabasePathsWithStorageType() {
+ return Stream.of(
+ Arguments.of(IcebergOptions.StorageType.TABLE_LOCATION,
"file:/tmp/paimon/mydb.db"),
+ Arguments.of(
+ IcebergOptions.StorageType.HIVE_CATALOG,
"file:/tmp/paimon/iceberg/mydb"),
+ Arguments.of(
+ IcebergOptions.StorageType.HADOOP_CATALOG,
+ "file:/tmp/paimon/iceberg/mydb"));
+ }
+
+ private static Stream<Arguments> provideDatabasePathsWithStorageLocation()
{
+ return Stream.of(
+ Arguments.of(
+ IcebergOptions.StorageType.TABLE_LOCATION,
+ IcebergOptions.StorageLocation.TABLE_LOCATION,
+ "file:/tmp/paimon/mydb.db"),
+ Arguments.of(
+ IcebergOptions.StorageType.TABLE_LOCATION,
+ IcebergOptions.StorageLocation.CATALOG_STORAGE,
+ "file:/tmp/paimon/iceberg/mydb"),
+ Arguments.of(
+ IcebergOptions.StorageType.HIVE_CATALOG,
+ IcebergOptions.StorageLocation.TABLE_LOCATION,
+ "file:/tmp/paimon/mydb.db"),
+ Arguments.of(
+ IcebergOptions.StorageType.HIVE_CATALOG,
+ IcebergOptions.StorageLocation.CATALOG_STORAGE,
+ "file:/tmp/paimon/iceberg/mydb"),
+ Arguments.of(
+ IcebergOptions.StorageType.HADOOP_CATALOG,
+ IcebergOptions.StorageLocation.TABLE_LOCATION,
+ "file:/tmp/paimon/mydb.db"),
+ Arguments.of(
+ IcebergOptions.StorageType.HADOOP_CATALOG,
+ IcebergOptions.StorageLocation.CATALOG_STORAGE,
+ "file:/tmp/paimon/iceberg/mydb"),
+ // Backward compatibility: StorageLocation is not provided.
+ Arguments.of(
+ IcebergOptions.StorageType.TABLE_LOCATION,
+ null,
+ "file:/tmp/paimon/mydb.db"), // Defaults to
TABLE_LOCATION
+ Arguments.of(
+ IcebergOptions.StorageType.HIVE_CATALOG,
+ null,
+ "file:/tmp/paimon/iceberg/mydb"), // Defaults to
CATALOG_STORAGE
+ Arguments.of(
+ IcebergOptions.StorageType.HADOOP_CATALOG,
+ null,
+ "file:/tmp/paimon/iceberg/mydb") // Defaults to
CATALOG_STORAGE
+ );
+ }
+}