This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new fa62ec1df2 [Hive] Fix newly added encryption keys getting lost in 
transactions (#14427)
fa62ec1df2 is described below

commit fa62ec1df2d4e56f06c18c42e96264abaf756deb
Author: Adam Szita <[email protected]>
AuthorDate: Tue Oct 28 17:50:23 2025 +0100

    [Hive] Fix newly added encryption keys getting lost in transactions (#14427)
    
    Committing transactions triggers a refresh() call on
    underlying table operations.
    
    HiveTableOperations recreates its encryption manager on
    such invocations, so any new encryption key stored in
    the EM's transient state will be lost.
    
    Effectively, appends within transactions will result in
    encrypted manifest lists with their encryption key
    being thrown away.
---
 .../apache/iceberg/hive/HiveTableOperations.java   | 39 +++++++++++++++++++---
 .../iceberg/spark/sql/TestTableEncryption.java     | 20 +++++++++++
 2 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index c2fb656cc3..5e17243dc8 100644
--- 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +55,7 @@ import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -86,7 +88,13 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
   private EncryptingFileIO encryptingFileIO;
   private String tableKeyId;
   private int encryptionDekLength;
-  private List<EncryptedKey> encryptedKeysFromMetadata;
+
+  // keys loaded from the latest metadata
+  private Optional<List<EncryptedKey>> encryptedKeysFromMetadata = 
Optional.empty();
+
+  // keys added to EM (e.g. as a result of a FileAppend) but not committed 
into the latest metadata
+  // yet
+  private Optional<List<EncryptedKey>> encryptedKeysPending = Optional.empty();
 
   protected HiveTableOperations(
       Configuration conf,
@@ -146,9 +154,13 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
       encryptionProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, 
tableKeyId);
       encryptionProperties.put(
           TableProperties.ENCRYPTION_DEK_LENGTH, 
String.valueOf(encryptionDekLength));
+
+      List<EncryptedKey> keys = Lists.newLinkedList();
+      encryptedKeysFromMetadata.ifPresent(keys::addAll);
+      encryptedKeysPending.ifPresent(keys::addAll);
+
       encryptionManager =
-          EncryptionUtil.createEncryptionManager(
-              encryptedKeysFromMetadata, encryptionProperties, 
keyManagementClient);
+          EncryptionUtil.createEncryptionManager(keys, encryptionProperties, 
keyManagementClient);
     } else {
       return PlaintextEncryptionManager.instance();
     }
@@ -202,7 +214,26 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
               ? Integer.parseInt(dekLengthFromHMS)
               : TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT;
 
-      encryptedKeysFromMetadata = current().encryptionKeys();
+      encryptedKeysFromMetadata = 
Optional.ofNullable(current().encryptionKeys());
+
+      if (encryptionManager != null) {
+        encryptedKeysPending = Optional.of(Lists.newLinkedList());
+
+        Set<String> keyIdsFromMetadata =
+            encryptedKeysFromMetadata.orElseGet(Lists::newLinkedList).stream()
+                .map(EncryptedKey::keyId)
+                .collect(Collectors.toSet());
+
+        for (EncryptedKey keyFromEM : 
EncryptionUtil.encryptionKeys(encryptionManager).values()) {
+          if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) {
+            encryptedKeysPending.get().add(keyFromEM);
+          }
+        }
+
+      } else {
+        encryptedKeysPending = Optional.empty();
+      }
+
       // Force re-creation of encryption manager with updated keys
       encryptingFileIO = null;
       encryptionManager = null;
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
index 695a41681b..c71bd28706 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
@@ -36,6 +37,7 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
 import org.apache.iceberg.encryption.Ciphers;
 import org.apache.iceberg.encryption.UnitestKMS;
 import org.apache.iceberg.io.InputFile;
@@ -114,6 +116,24 @@ public class TestTableEncryption extends CatalogTestBase {
     assertThat(currentDataFiles(table)).isNotEmpty();
   }
 
+  @TestTemplate
+  public void testTransaction() {
+    catalog.initialize(catalogName, catalogConfig);
+
+    Table table = catalog.loadTable(tableIdent);
+
+    List<DataFile> dataFiles = currentDataFiles(table);
+    Transaction transaction = table.newTransaction();
+    AppendFiles append = transaction.newAppend();
+
+    // add an arbitrary datafile
+    append.appendFile(dataFiles.get(0));
+    append.commit();
+    transaction.commitTransaction();
+
+    assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 1);
+  }
+
   @TestTemplate
   public void testInsertAndDelete() {
     sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f', 
float('NaN'))", tableName);

Reply via email to