This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new cbd35799dc Encryption: Simplify Hive key handling and add transaction 
tests (#14752)
cbd35799dc is described below

commit cbd35799dc70e989528b1c14d640fe91cdafa52d
Author: Sreesh Maheshwar <[email protected]>
AuthorDate: Wed Dec 10 17:16:23 2025 +0000

    Encryption: Simplify Hive key handling and add transaction tests (#14752)
    
    * Encryption: Simplify Hive key handling and add transaction tests
    
    * spotless
---
 .../apache/iceberg/hive/HiveTableOperations.java   | 30 +++-------
 .../iceberg/spark/sql/TestTableEncryption.java     | 64 ++++++++++++++++++++--
 2 files changed, 67 insertions(+), 27 deletions(-)

diff --git 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 62340fe401..a97d82bb6f 100644
--- 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -91,12 +91,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
   private String tableKeyId;
   private int encryptionDekLength;
 
-  // keys loaded from the latest metadata
-  private Optional<List<EncryptedKey>> encryptedKeysFromMetadata = 
Optional.empty();
-
-  // keys added to EM (e.g. as a result of a FileAppend) but not committed 
into the latest metadata
-  // yet
-  private Optional<List<EncryptedKey>> encryptedKeysPending = Optional.empty();
+  private List<EncryptedKey> encryptedKeys = List.of();
 
   protected HiveTableOperations(
       Configuration conf,
@@ -157,12 +152,9 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
       encryptionProperties.put(
           TableProperties.ENCRYPTION_DEK_LENGTH, 
String.valueOf(encryptionDekLength));
 
-      List<EncryptedKey> keys = Lists.newLinkedList();
-      encryptedKeysFromMetadata.ifPresent(keys::addAll);
-      encryptedKeysPending.ifPresent(keys::addAll);
-
       encryptionManager =
-          EncryptionUtil.createEncryptionManager(keys, encryptionProperties, 
keyManagementClient);
+          EncryptionUtil.createEncryptionManager(
+              encryptedKeys, encryptionProperties, keyManagementClient);
     } else {
       return PlaintextEncryptionManager.instance();
     }
@@ -218,24 +210,20 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
               ? Integer.parseInt(dekLengthFromHMS)
               : TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT;
 
-      encryptedKeysFromMetadata = 
Optional.ofNullable(current().encryptionKeys());
+      encryptedKeys =
+          Optional.ofNullable(current().encryptionKeys())
+              .map(Lists::newLinkedList)
+              .orElseGet(Lists::newLinkedList);
 
       if (encryptionManager != null) {
-        encryptedKeysPending = Optional.of(Lists.newLinkedList());
-
         Set<String> keyIdsFromMetadata =
-            encryptedKeysFromMetadata.orElseGet(Lists::newLinkedList).stream()
-                .map(EncryptedKey::keyId)
-                .collect(Collectors.toSet());
+            
encryptedKeys.stream().map(EncryptedKey::keyId).collect(Collectors.toSet());
 
         for (EncryptedKey keyFromEM : 
EncryptionUtil.encryptionKeys(encryptionManager).values()) {
           if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) {
-            encryptedKeysPending.get().add(keyFromEM);
+            encryptedKeys.add(keyFromEM);
           }
         }
-
-      } else {
-        encryptedKeysPending = Optional.empty();
       }
 
       // Force re-creation of encryption manager with updated keys
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
index 85e7f48b59..905516bff9 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
@@ -112,8 +112,8 @@ public class TestTableEncryption extends CatalogTestBase {
 
   @TestTemplate
   public void testRefresh() {
-    catalog.initialize(catalogName, catalogConfig);
-    Table table = catalog.loadTable(tableIdent);
+    validationCatalog.initialize(catalogName, catalogConfig);
+    Table table = validationCatalog.loadTable(tableIdent);
 
     assertThat(currentDataFiles(table)).isNotEmpty();
 
@@ -124,10 +124,26 @@ public class TestTableEncryption extends CatalogTestBase {
   }
 
   @TestTemplate
-  public void testTransaction() {
-    catalog.initialize(catalogName, catalogConfig);
+  public void testAppendTransaction() {
+    validationCatalog.initialize(catalogName, catalogConfig);
+    Table table = validationCatalog.loadTable(tableIdent);
 
-    Table table = catalog.loadTable(tableIdent);
+    List<DataFile> dataFiles = currentDataFiles(table);
+    Transaction transaction = table.newTransaction();
+    AppendFiles append = transaction.newAppend();
+
+    // add an arbitrary datafile
+    append.appendFile(dataFiles.get(0));
+    append.commit();
+    transaction.commitTransaction();
+
+    assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 1);
+  }
+
+  @TestTemplate
+  public void testConcurrentAppendTransactions() {
+    validationCatalog.initialize(catalogName, catalogConfig);
+    Table table = validationCatalog.loadTable(tableIdent);
 
     List<DataFile> dataFiles = currentDataFiles(table);
     Transaction transaction = table.newTransaction();
@@ -135,10 +151,46 @@ public class TestTableEncryption extends CatalogTestBase {
 
     // add an arbitrary datafile
     append.appendFile(dataFiles.get(0));
+
+    // append to the table in the meantime. use a separate load to avoid 
shared operations
+    
validationCatalog.loadTable(tableIdent).newFastAppend().appendFile(dataFiles.get(0)).commit();
+
     append.commit();
     transaction.commitTransaction();
 
-    assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 1);
+    assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 2);
+  }
+
+  // See CatalogTests#testConcurrentReplaceTransactions
+  @TestTemplate
+  public void testConcurrentReplaceTransactions() {
+    validationCatalog.initialize(catalogName, catalogConfig);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    DataFile file = currentDataFiles(table).get(0);
+    Schema schema = table.schema();
+
+    // Write data for a replace transaction that will be committed later
+    Transaction secondReplace =
+        validationCatalog
+            .buildTable(tableIdent, schema)
+            .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
+            .replaceTransaction();
+    secondReplace.newFastAppend().appendFile(file).commit();
+
+    // Commit another replace transaction first
+    Transaction firstReplace =
+        validationCatalog
+            .buildTable(tableIdent, schema)
+            .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
+            .replaceTransaction();
+    firstReplace.newFastAppend().appendFile(file).commit();
+    firstReplace.commitTransaction();
+
+    secondReplace.commitTransaction();
+
+    Table afterSecondReplace = validationCatalog.loadTable(tableIdent);
+    assertThat(currentDataFiles(afterSecondReplace)).hasSize(1);
   }
 
   @TestTemplate

Reply via email to