This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new fa62ec1df2 [Hive] Fix newly added encryption keys getting lost in
transactions (#14427)
fa62ec1df2 is described below
commit fa62ec1df2d4e56f06c18c42e96264abaf756deb
Author: Adam Szita <[email protected]>
AuthorDate: Tue Oct 28 17:50:23 2025 +0100
[Hive] Fix newly added encryption keys getting lost in transactions (#14427)
Committing transactions triggers a refresh() call on
underlying table operations.
HiveTableOperations recreates its encryption manager on
such invocations, so any new encryption key stored in
the EM's transient state will be lost.
Effectively, appends within transactions will result in
encrypted manifest lists with their encryption key
being thrown away.
---
.../apache/iceberg/hive/HiveTableOperations.java | 39 +++++++++++++++++++---
.../iceberg/spark/sql/TestTableEncryption.java | 20 +++++++++++
2 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index c2fb656cc3..5e17243dc8 100644
---
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@@ -54,6 +55,7 @@ import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -86,7 +88,13 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
private EncryptingFileIO encryptingFileIO;
private String tableKeyId;
private int encryptionDekLength;
- private List<EncryptedKey> encryptedKeysFromMetadata;
+
+ // keys loaded from the latest metadata
+ private Optional<List<EncryptedKey>> encryptedKeysFromMetadata =
Optional.empty();
+
+ // keys added to EM (e.g. as a result of a FileAppend) but not committed
into the latest metadata
+ // yet
+ private Optional<List<EncryptedKey>> encryptedKeysPending = Optional.empty();
protected HiveTableOperations(
Configuration conf,
@@ -146,9 +154,13 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
encryptionProperties.put(TableProperties.ENCRYPTION_TABLE_KEY,
tableKeyId);
encryptionProperties.put(
TableProperties.ENCRYPTION_DEK_LENGTH,
String.valueOf(encryptionDekLength));
+
+ List<EncryptedKey> keys = Lists.newLinkedList();
+ encryptedKeysFromMetadata.ifPresent(keys::addAll);
+ encryptedKeysPending.ifPresent(keys::addAll);
+
encryptionManager =
- EncryptionUtil.createEncryptionManager(
- encryptedKeysFromMetadata, encryptionProperties,
keyManagementClient);
+ EncryptionUtil.createEncryptionManager(keys, encryptionProperties,
keyManagementClient);
} else {
return PlaintextEncryptionManager.instance();
}
@@ -202,7 +214,26 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
? Integer.parseInt(dekLengthFromHMS)
: TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT;
- encryptedKeysFromMetadata = current().encryptionKeys();
+ encryptedKeysFromMetadata =
Optional.ofNullable(current().encryptionKeys());
+
+ if (encryptionManager != null) {
+ encryptedKeysPending = Optional.of(Lists.newLinkedList());
+
+ Set<String> keyIdsFromMetadata =
+ encryptedKeysFromMetadata.orElseGet(Lists::newLinkedList).stream()
+ .map(EncryptedKey::keyId)
+ .collect(Collectors.toSet());
+
+ for (EncryptedKey keyFromEM :
EncryptionUtil.encryptionKeys(encryptionManager).values()) {
+ if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) {
+ encryptedKeysPending.get().add(keyFromEM);
+ }
+ }
+
+ } else {
+ encryptedKeysPending = Optional.empty();
+ }
+
// Force re-creation of encryption manager with updated keys
encryptingFileIO = null;
encryptionManager = null;
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
index 695a41681b..c71bd28706 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
@@ -36,6 +37,7 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
import org.apache.iceberg.encryption.Ciphers;
import org.apache.iceberg.encryption.UnitestKMS;
import org.apache.iceberg.io.InputFile;
@@ -114,6 +116,24 @@ public class TestTableEncryption extends CatalogTestBase {
assertThat(currentDataFiles(table)).isNotEmpty();
}
+ @TestTemplate
+ public void testTransaction() {
+ catalog.initialize(catalogName, catalogConfig);
+
+ Table table = catalog.loadTable(tableIdent);
+
+ List<DataFile> dataFiles = currentDataFiles(table);
+ Transaction transaction = table.newTransaction();
+ AppendFiles append = transaction.newAppend();
+
+ // add an arbitrary datafile
+ append.appendFile(dataFiles.get(0));
+ append.commit();
+ transaction.commitTransaction();
+
+ assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 1);
+ }
+
@TestTemplate
public void testInsertAndDelete() {
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f',
float('NaN'))", tableName);