This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new cbd35799dc Encryption: Simplify Hive key handling and add transaction
tests (#14752)
cbd35799dc is described below
commit cbd35799dc70e989528b1c14d640fe91cdafa52d
Author: Sreesh Maheshwar <[email protected]>
AuthorDate: Wed Dec 10 17:16:23 2025 +0000
Encryption: Simplify Hive key handling and add transaction tests (#14752)
* Encryption: Simplify Hive key handling and add transaction tests
* spotless
---
.../apache/iceberg/hive/HiveTableOperations.java | 30 +++-------
.../iceberg/spark/sql/TestTableEncryption.java | 64 ++++++++++++++++++++--
2 files changed, 67 insertions(+), 27 deletions(-)
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 62340fe401..a97d82bb6f 100644
---
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -91,12 +91,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
private String tableKeyId;
private int encryptionDekLength;
- // keys loaded from the latest metadata
- private Optional<List<EncryptedKey>> encryptedKeysFromMetadata =
Optional.empty();
-
- // keys added to EM (e.g. as a result of a FileAppend) but not committed
into the latest metadata
- // yet
- private Optional<List<EncryptedKey>> encryptedKeysPending = Optional.empty();
+ private List<EncryptedKey> encryptedKeys = List.of();
protected HiveTableOperations(
Configuration conf,
@@ -157,12 +152,9 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
encryptionProperties.put(
TableProperties.ENCRYPTION_DEK_LENGTH,
String.valueOf(encryptionDekLength));
- List<EncryptedKey> keys = Lists.newLinkedList();
- encryptedKeysFromMetadata.ifPresent(keys::addAll);
- encryptedKeysPending.ifPresent(keys::addAll);
-
encryptionManager =
- EncryptionUtil.createEncryptionManager(keys, encryptionProperties,
keyManagementClient);
+ EncryptionUtil.createEncryptionManager(
+ encryptedKeys, encryptionProperties, keyManagementClient);
} else {
return PlaintextEncryptionManager.instance();
}
@@ -218,24 +210,20 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
? Integer.parseInt(dekLengthFromHMS)
: TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT;
- encryptedKeysFromMetadata =
Optional.ofNullable(current().encryptionKeys());
+ encryptedKeys =
+ Optional.ofNullable(current().encryptionKeys())
+ .map(Lists::newLinkedList)
+ .orElseGet(Lists::newLinkedList);
if (encryptionManager != null) {
- encryptedKeysPending = Optional.of(Lists.newLinkedList());
-
Set<String> keyIdsFromMetadata =
- encryptedKeysFromMetadata.orElseGet(Lists::newLinkedList).stream()
- .map(EncryptedKey::keyId)
- .collect(Collectors.toSet());
+
encryptedKeys.stream().map(EncryptedKey::keyId).collect(Collectors.toSet());
for (EncryptedKey keyFromEM :
EncryptionUtil.encryptionKeys(encryptionManager).values()) {
if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) {
- encryptedKeysPending.get().add(keyFromEM);
+ encryptedKeys.add(keyFromEM);
}
}
-
- } else {
- encryptedKeysPending = Optional.empty();
}
// Force re-creation of encryption manager with updated keys
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
index 85e7f48b59..905516bff9 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
@@ -112,8 +112,8 @@ public class TestTableEncryption extends CatalogTestBase {
@TestTemplate
public void testRefresh() {
- catalog.initialize(catalogName, catalogConfig);
- Table table = catalog.loadTable(tableIdent);
+ validationCatalog.initialize(catalogName, catalogConfig);
+ Table table = validationCatalog.loadTable(tableIdent);
assertThat(currentDataFiles(table)).isNotEmpty();
@@ -124,10 +124,26 @@ public class TestTableEncryption extends CatalogTestBase {
}
@TestTemplate
- public void testTransaction() {
- catalog.initialize(catalogName, catalogConfig);
+ public void testAppendTransaction() {
+ validationCatalog.initialize(catalogName, catalogConfig);
+ Table table = validationCatalog.loadTable(tableIdent);
- Table table = catalog.loadTable(tableIdent);
+ List<DataFile> dataFiles = currentDataFiles(table);
+ Transaction transaction = table.newTransaction();
+ AppendFiles append = transaction.newAppend();
+
+ // add an arbitrary datafile
+ append.appendFile(dataFiles.get(0));
+ append.commit();
+ transaction.commitTransaction();
+
+ assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 1);
+ }
+
+ @TestTemplate
+ public void testConcurrentAppendTransactions() {
+ validationCatalog.initialize(catalogName, catalogConfig);
+ Table table = validationCatalog.loadTable(tableIdent);
List<DataFile> dataFiles = currentDataFiles(table);
Transaction transaction = table.newTransaction();
@@ -135,10 +151,46 @@ public class TestTableEncryption extends CatalogTestBase {
// add an arbitrary datafile
append.appendFile(dataFiles.get(0));
+
+ // append to the table in the meantime. use a separate load to avoid
shared operations
+
validationCatalog.loadTable(tableIdent).newFastAppend().appendFile(dataFiles.get(0)).commit();
+
append.commit();
transaction.commitTransaction();
- assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 1);
+ assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 2);
+ }
+
+ // See CatalogTests#testConcurrentReplaceTransactions
+ @TestTemplate
+ public void testConcurrentReplaceTransactions() {
+ validationCatalog.initialize(catalogName, catalogConfig);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ DataFile file = currentDataFiles(table).get(0);
+ Schema schema = table.schema();
+
+ // Write data for a replace transaction that will be committed later
+ Transaction secondReplace =
+ validationCatalog
+ .buildTable(tableIdent, schema)
+ .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
+ .replaceTransaction();
+ secondReplace.newFastAppend().appendFile(file).commit();
+
+ // Commit another replace transaction first
+ Transaction firstReplace =
+ validationCatalog
+ .buildTable(tableIdent, schema)
+ .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
+ .replaceTransaction();
+ firstReplace.newFastAppend().appendFile(file).commit();
+ firstReplace.commitTransaction();
+
+ secondReplace.commitTransaction();
+
+ Table afterSecondReplace = validationCatalog.loadTable(tableIdent);
+ assertThat(currentDataFiles(afterSecondReplace)).hasSize(1);
}
@TestTemplate