This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 30ca573fe4 Encryption integration and test (#13066)
30ca573fe4 is described below

commit 30ca573fe4a7fbbe86a7ca8bd2c0fcab5c025c13
Author: ggershinsky <[email protected]>
AuthorDate: Tue Oct 21 04:39:10 2025 +0300

    Encryption integration and test (#13066)
    
    * initial commit
    
    * add missing parts
    
    * Address review comments
    
    Co-Authored-By: Sreesh Maheshwar 
<[email protected]>
    
    * fix style
    
    ---------
    
    Co-authored-by: Sreesh Maheshwar 
<[email protected]>
---
 .../iceberg/BaseMetastoreTableOperations.java      |   2 +-
 .../apache/iceberg/encryption/EncryptionUtil.java  |  12 +-
 .../encryption/StandardEncryptionManager.java      |  38 +++-
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |  20 +-
 .../apache/iceberg/hive/HiveTableOperations.java   | 190 ++++++++++++++++-
 .../apache/iceberg/hive/TestHiveCommitLocks.java   |   2 +
 .../iceberg/spark/sql/TestCTASEncryption.java      | 120 +++++++++++
 .../iceberg/spark/sql/TestTableEncryption.java     | 232 +++++++++++++++++++++
 8 files changed, 606 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java 
b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index 9fa52d52ea..bda983a6c1 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -214,7 +214,7 @@ public abstract class BaseMetastoreTableOperations extends 
BaseMetastoreOperatio
     this.shouldRefresh = false;
   }
 
-  private String metadataFileLocation(TableMetadata metadata, String filename) 
{
+  protected String metadataFileLocation(TableMetadata metadata, String 
filename) {
     String metadataLocation = 
metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);
 
     if (metadataLocation != null) {
diff --git 
a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java 
b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
index 1b35cb82f0..51b73cc43d 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
@@ -75,7 +75,7 @@ public class EncryptionUtil {
     return kmsClient;
   }
 
-  static EncryptionManager createEncryptionManager(
+  public static EncryptionManager createEncryptionManager(
       List<EncryptedKey> keys, Map<String, String> tableProperties, 
KeyManagementClient kmsClient) {
     Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null");
     String tableKeyId = 
tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY);
@@ -96,7 +96,7 @@ public class EncryptionUtil {
         "Invalid data key length: %s (must be 16, 24, or 32)",
         dataKeyLength);
 
-    return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient);
+    return new StandardEncryptionManager(keys, tableKeyId, dataKeyLength, 
kmsClient);
   }
 
   public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile 
encryptingOutputFile) {
@@ -128,6 +128,14 @@ public class EncryptionUtil {
     return ByteBuffer.wrap(decryptedKeyMetadata);
   }
 
+  public static Map<String, EncryptedKey> encryptionKeys(EncryptionManager em) 
{
+    Preconditions.checkState(
+        em instanceof StandardEncryptionManager,
+        "Retrieving encryption keys requires a StandardEncryptionManager");
+    StandardEncryptionManager sem = (StandardEncryptionManager) em;
+    return sem.encryptionKeys();
+  }
+
   /**
    * Encrypts the key metadata for a manifest list.
    *
diff --git 
a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
 
b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
index d000221bb6..b68cf4ed67 100644
--- 
a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
+++ 
b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
@@ -23,6 +23,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.nio.ByteBuffer;
 import java.security.SecureRandom;
 import java.util.Base64;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.TableProperties;
@@ -46,9 +47,19 @@ public class StandardEncryptionManager implements 
EncryptionManager {
     private final Map<String, EncryptedKey> encryptionKeys;
     private final LoadingCache<String, ByteBuffer> unwrappedKeyCache;
 
-    private TransientEncryptionState(KeyManagementClient kmsClient) {
+    private TransientEncryptionState(KeyManagementClient kmsClient, 
List<EncryptedKey> keys) {
       this.kmsClient = kmsClient;
       this.encryptionKeys = Maps.newLinkedHashMap();
+
+      if (keys != null) {
+        for (EncryptedKey key : keys) {
+          encryptionKeys.put(
+              key.keyId(),
+              new BaseEncryptedKey(
+                  key.keyId(), key.encryptedKeyMetadata(), 
key.encryptedById(), key.properties()));
+        }
+      }
+
       this.unwrappedKeyCache =
           Caffeine.newBuilder()
               .expireAfterWrite(1, TimeUnit.HOURS)
@@ -64,12 +75,25 @@ public class StandardEncryptionManager implements 
EncryptionManager {
   private transient volatile SecureRandom lazyRNG = null;
 
   /**
+   * @deprecated will be removed in 2.0.
+   */
+  @Deprecated
+  public StandardEncryptionManager(
+      String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) {
+    this(List.of(), tableKeyId, dataKeyLength, kmsClient);
+  }
+
+  /**
+   * @param keys encryption keys from table metadata
    * @param tableKeyId table encryption key id
    * @param dataKeyLength length of data encryption key (16/24/32 bytes)
    * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope 
encryption
    */
   public StandardEncryptionManager(
-      String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) {
+      List<EncryptedKey> keys,
+      String tableKeyId,
+      int dataKeyLength,
+      KeyManagementClient kmsClient) {
     Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null");
     Preconditions.checkArgument(
         dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32,
@@ -77,7 +101,7 @@ public class StandardEncryptionManager implements 
EncryptionManager {
         dataKeyLength);
     Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null");
     this.tableKeyId = tableKeyId;
-    this.transientState = new TransientEncryptionState(kmsClient);
+    this.transientState = new TransientEncryptionState(kmsClient, keys);
     this.dataKeyLength = dataKeyLength;
   }
 
@@ -134,6 +158,14 @@ public class StandardEncryptionManager implements 
EncryptionManager {
     return transientState.kmsClient.unwrapKey(wrappedSecretKey, tableKeyId);
   }
 
+  Map<String, EncryptedKey> encryptionKeys() {
+    if (transientState == null) {
+      throw new IllegalStateException("Cannot return the encryption keys after 
serialization");
+    }
+
+    return transientState.encryptionKeys;
+  }
+
   private String keyEncryptionKeyID() {
     if (transientState == null) {
       throw new IllegalStateException("Cannot return the current key after 
serialization");
diff --git 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 42a74f17f1..5e7a249af8 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.hive;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,6 +46,8 @@ import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.encryption.EncryptionUtil;
+import org.apache.iceberg.encryption.KeyManagementClient;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
@@ -88,6 +91,7 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
   private String name;
   private Configuration conf;
   private FileIO fileIO;
+  private KeyManagementClient keyManagementClient;
   private ClientPool<IMetaStoreClient, TException> clients;
   private boolean listAllTables = false;
   private Map<String, String> catalogProperties;
@@ -122,6 +126,10 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
             ? new HadoopFileIO(conf)
             : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
 
+    if (catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) {
+      this.keyManagementClient = EncryptionUtil.createKmsClient(properties);
+    }
+
     this.clients = new CachedClientPool(conf, properties);
   }
 
@@ -686,7 +694,8 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
   public TableOperations newTableOps(TableIdentifier tableIdentifier) {
     String dbName = tableIdentifier.namespace().level(0);
     String tableName = tableIdentifier.name();
-    return new HiveTableOperations(conf, clients, fileIO, name, dbName, 
tableName);
+    return new HiveTableOperations(
+        conf, clients, fileIO, keyManagementClient, name, dbName, tableName);
   }
 
   @Override
@@ -816,6 +825,15 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
     return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
   }
 
+  @Override
+  public void close() throws IOException {
+    super.close();
+
+    if (keyManagementClient != null) {
+      keyManagementClient.close();
+    }
+  }
+
   @VisibleForTesting
   void setListAllTables(boolean listAllTables) {
     this.listAllTables = listAllTables;
diff --git 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 3e4be78169..c2fb656cc3 100644
--- 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -19,6 +19,8 @@
 package org.apache.iceberg.hive;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -32,8 +34,17 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.iceberg.BaseMetastoreOperations;
 import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.LocationProviders;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptedKey;
+import org.apache.iceberg.encryption.EncryptingFileIO;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.EncryptionUtil;
+import org.apache.iceberg.encryption.KeyManagementClient;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.encryption.StandardEncryptionManager;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
@@ -41,7 +52,9 @@ import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,18 +79,27 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
   private final long maxHiveTablePropertySize;
   private final int metadataRefreshMaxRetries;
   private final FileIO fileIO;
+  private final KeyManagementClient keyManagementClient;
   private final ClientPool<IMetaStoreClient, TException> metaClients;
 
+  private EncryptionManager encryptionManager;
+  private EncryptingFileIO encryptingFileIO;
+  private String tableKeyId;
+  private int encryptionDekLength;
+  private List<EncryptedKey> encryptedKeysFromMetadata;
+
   protected HiveTableOperations(
       Configuration conf,
       ClientPool<IMetaStoreClient, TException> metaClients,
       FileIO fileIO,
+      KeyManagementClient keyManagementClient,
       String catalogName,
       String database,
       String table) {
     this.conf = conf;
     this.metaClients = metaClients;
     this.fileIO = fileIO;
+    this.keyManagementClient = keyManagementClient;
     this.fullName = catalogName + "." + database + "." + table;
     this.catalogName = catalogName;
     this.database = database;
@@ -97,12 +119,48 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
 
   @Override
   public FileIO io() {
-    return fileIO;
+    if (tableKeyId == null) {
+      return fileIO;
+    }
+
+    if (encryptingFileIO == null) {
+      encryptingFileIO = EncryptingFileIO.combine(fileIO, encryption());
+    }
+
+    return encryptingFileIO;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    if (encryptionManager != null) {
+      return encryptionManager;
+    }
+
+    if (tableKeyId != null) {
+      if (keyManagementClient == null) {
+        throw new RuntimeException(
+            "Cant create encryption manager, because key management client is 
not set");
+      }
+
+      Map<String, String> encryptionProperties = Maps.newHashMap();
+      encryptionProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, 
tableKeyId);
+      encryptionProperties.put(
+          TableProperties.ENCRYPTION_DEK_LENGTH, 
String.valueOf(encryptionDekLength));
+      encryptionManager =
+          EncryptionUtil.createEncryptionManager(
+              encryptedKeysFromMetadata, encryptionProperties, 
keyManagementClient);
+    } else {
+      return PlaintextEncryptionManager.instance();
+    }
+
+    return encryptionManager;
   }
 
   @Override
   protected void doRefresh() {
     String metadataLocation = null;
+    String tableKeyIdFromHMS = null;
+    String dekLengthFromHMS = null;
     try {
       Table table = metaClients.run(client -> client.getTable(database, 
tableName));
 
@@ -112,7 +170,12 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
       HiveOperationsBase.validateTableIsIceberg(table, fullName);
 
       metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
-
+      /* Table key ID must be retrieved from a catalog service, and not from 
untrusted storage
+      (e.g. metadata json file) that can be tampered with. For example, an 
attacker can remove
+      the table key parameter (along with existing snapshots) in the file, 
making the writers
+      produce unencrypted files. Table key ID is taken directly from HMS 
catalog */
+      tableKeyIdFromHMS = 
table.getParameters().get(TableProperties.ENCRYPTION_TABLE_KEY);
+      dekLengthFromHMS = 
table.getParameters().get(TableProperties.ENCRYPTION_DEK_LENGTH);
     } catch (NoSuchObjectException e) {
       if (currentMetadataLocation() != null) {
         throw new NoSuchTableException("No such table: %s.%s", database, 
tableName);
@@ -129,13 +192,44 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
     }
 
     refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
+
+    if (tableKeyIdFromHMS != null) {
+      checkEncryptionProperties(tableKeyIdFromHMS, dekLengthFromHMS);
+
+      tableKeyId = tableKeyIdFromHMS;
+      encryptionDekLength =
+          (dekLengthFromHMS != null)
+              ? Integer.parseInt(dekLengthFromHMS)
+              : TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT;
+
+      encryptedKeysFromMetadata = current().encryptionKeys();
+      // Force re-creation of encryption manager with updated keys
+      encryptingFileIO = null;
+      encryptionManager = null;
+    }
   }
 
   @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
     boolean newTable = base == null;
-    String newMetadataLocation = writeNewMetadataIfRequired(newTable, 
metadata);
+    encryptionPropsFromMetadata(metadata.properties());
+
+    String newMetadataLocation;
+    EncryptionManager encrManager = encryption();
+    if (encrManager instanceof StandardEncryptionManager) {
+      // Add new encryption keys to the metadata
+      TableMetadata.Builder builder = TableMetadata.buildFrom(metadata);
+      for (Map.Entry<String, EncryptedKey> entry :
+          EncryptionUtil.encryptionKeys(encrManager).entrySet()) {
+        builder.addEncryptionKey(entry.getValue());
+      }
+
+      newMetadataLocation = writeNewMetadataIfRequired(newTable, 
builder.build());
+    } else {
+      newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
+    }
+
     boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
     boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, 
false);
 
@@ -194,6 +288,10 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
                 .collect(Collectors.toSet());
       }
 
+      if (removedProps.contains(TableProperties.ENCRYPTION_TABLE_KEY)) {
+        throw new RuntimeException("Cannot remove key in encrypted table");
+      }
+
       HMSTablePropertyHelper.updateHmsTableForIcebergTable(
           newMetadataLocation,
           tbl,
@@ -321,6 +419,54 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
     return metaClients;
   }
 
+  @Override
+  public TableOperations temp(TableMetadata uncommittedMetadata) {
+    return new TableOperations() {
+      @Override
+      public TableMetadata current() {
+        return uncommittedMetadata;
+      }
+
+      @Override
+      public TableMetadata refresh() {
+        throw new UnsupportedOperationException(
+            "Cannot call refresh on temporary table operations");
+      }
+
+      @Override
+      public void commit(TableMetadata base, TableMetadata metadata) {
+        throw new UnsupportedOperationException("Cannot call commit on 
temporary table operations");
+      }
+
+      @Override
+      public String metadataFileLocation(String fileName) {
+        return 
HiveTableOperations.this.metadataFileLocation(uncommittedMetadata, fileName);
+      }
+
+      @Override
+      public LocationProvider locationProvider() {
+        return LocationProviders.locationsFor(
+            uncommittedMetadata.location(), uncommittedMetadata.properties());
+      }
+
+      @Override
+      public FileIO io() {
+        
HiveTableOperations.this.encryptionPropsFromMetadata(uncommittedMetadata.properties());
+        return HiveTableOperations.this.io();
+      }
+
+      @Override
+      public EncryptionManager encryption() {
+        return HiveTableOperations.this.encryption();
+      }
+
+      @Override
+      public long newSnapshotId() {
+        return HiveTableOperations.this.newSnapshotId();
+      }
+    };
+  }
+
   /**
    * Returns if the hive engine related values should be enabled on the table, 
or not.
    *
@@ -375,6 +521,44 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
         ConfigProperties.LOCK_HIVE_ENABLED, 
TableProperties.HIVE_LOCK_ENABLED_DEFAULT);
   }
 
+  private void encryptionPropsFromMetadata(Map<String, String> 
tableProperties) {
+    if (tableKeyId == null) {
+      tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY);
+    }
+
+    if (tableKeyId != null && encryptionDekLength <= 0) {
+      String dekLength = 
tableProperties.get(TableProperties.ENCRYPTION_DEK_LENGTH);
+      encryptionDekLength =
+          (dekLength == null)
+              ? TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT
+              : Integer.parseInt(dekLength);
+    }
+  }
+
+  private void checkEncryptionProperties(String encryptionKeyIdFromHMS, String 
dekLengthFromHMS) {
+    Map<String, String> propertiesFromMetadata = current().properties();
+
+    String encryptionKeyIdFromMetadata =
+        propertiesFromMetadata.get(TableProperties.ENCRYPTION_TABLE_KEY);
+    if (!Objects.equals(encryptionKeyIdFromHMS, encryptionKeyIdFromMetadata)) {
+      String errMsg =
+          String.format(
+              "Metadata file might have been modified. Encryption key id %s 
differs from HMS value %s",
+              encryptionKeyIdFromMetadata, encryptionKeyIdFromHMS);
+      throw new RuntimeException(errMsg);
+    }
+
+    String dekLengthFromMetadata =
+        propertiesFromMetadata.get(TableProperties.ENCRYPTION_DEK_LENGTH);
+    if (!Objects.equals(dekLengthFromHMS, dekLengthFromMetadata)) {
+      String errMsg =
+          String.format(
+              "Metadata file might have been modified. DEK length %s differs 
from HMS value %s",
+              dekLengthFromMetadata, dekLengthFromHMS);
+      throw new RuntimeException(errMsg);
+    }
+  }
+
   @VisibleForTesting
   HiveLock lockObject(TableMetadata metadata) {
     if (hiveLockEnabled(metadata, conf)) {
diff --git 
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java 
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 0ffcb05709..e62ce9aeae 100644
--- 
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ 
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -185,6 +185,7 @@ public class TestHiveCommitLocks {
                 overriddenHiveConf,
                 spyCachedClientPool,
                 ops.io(),
+                null,
                 catalog.name(),
                 dbName,
                 tableName));
@@ -615,6 +616,7 @@ public class TestHiveCommitLocks {
                 confWithLock,
                 spyCachedClientPool,
                 ops.io(),
+                null,
                 catalog.name(),
                 TABLE_IDENTIFIER.namespace().level(0),
                 TABLE_IDENTIFIER.name()));
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java
new file mode 100644
index 0000000000..6094ab0ccc
--- /dev/null
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.UnitestKMS;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestCTASEncryption extends CatalogTestBase {
+  private static Map<String, String> 
appendCatalogEncryptionProperties(Map<String, String> props) {
+    Map<String, String> newProps = Maps.newHashMap();
+    newProps.putAll(props);
+    newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL, 
UnitestKMS.class.getCanonicalName());
+    return newProps;
+  }
+
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+  protected static Object[][] parameters() {
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties())
+      }
+    };
+  }
+
+  @BeforeEach
+  public void createTables() {
+    sql("CREATE TABLE %s (id bigint, data string, float float) USING iceberg 
", tableName + "1");
+    sql(
+        "INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', 
float('NaN'))",
+        tableName + "1");
+
+    sql(
+        "CREATE TABLE %s USING iceberg "
+            + "TBLPROPERTIES ( "
+            + "'encryption.key-id'='%s')"
+            + " AS SELECT * from %s",
+        tableName, UnitestKMS.MASTER_KEY_NAME1, tableName + "1");
+  }
+
+  @AfterEach
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s", tableName + "1");
+  }
+
+  @TestTemplate
+  public void testSelect() {
+    List<Object[]> expected =
+        ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c", 
Float.NaN));
+
+    assertEquals("Should return all expected rows", expected, sql("SELECT * 
FROM %s", tableName));
+  }
+
+  @TestTemplate
+  public void testDirectDataFileRead() {
+    List<Object[]> dataFileTable =
+        sql("SELECT file_path FROM %s.%s", tableName, 
MetadataTableType.ALL_DATA_FILES);
+    List<String> dataFiles =
+        Streams.concat(dataFileTable.stream())
+            .map(row -> (String) row[0])
+            .collect(Collectors.toList());
+
+    if (dataFiles.isEmpty()) {
+      throw new RuntimeException("No data files found for table " + tableName);
+    }
+
+    Schema schema = new Schema(optional(0, "id", Types.IntegerType.get()));
+    for (String filePath : dataFiles) {
+      assertThatThrownBy(
+              () ->
+                  Parquet.read(localInput(filePath))
+                      .project(schema)
+                      .callInit()
+                      .build()
+                      .iterator()
+                      .next())
+          .isInstanceOf(ParquetCryptoRuntimeException.class)
+          .hasMessageContaining("Trying to read file with encrypted footer. No 
keys available");
+    }
+  }
+}
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
new file mode 100644
index 0000000000..695a41681b
--- /dev/null
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.Ciphers;
+import org.apache.iceberg.encryption.UnitestKMS;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestTableEncryption extends CatalogTestBase {
+  private static Map<String, String> 
appendCatalogEncryptionProperties(Map<String, String> props) {
+    Map<String, String> newProps = Maps.newHashMap();
+    newProps.putAll(props);
+    newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL, 
UnitestKMS.class.getCanonicalName());
+    return newProps;
+  }
+
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+  protected static Object[][] parameters() {
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties())
+      }
+    };
+  }
+
+  @BeforeEach
+  public void createTables() {
+    sql(
+        "CREATE TABLE %s (id bigint, data string, float float) USING iceberg "
+            + "TBLPROPERTIES ( "
+            + "'encryption.key-id'='%s')",
+        tableName, UnitestKMS.MASTER_KEY_NAME1);
+
+    sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', 
float('NaN'))", tableName);
+  }
+
+  @AfterEach
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @TestTemplate
+  public void testSelect() {
+    List<Object[]> expected =
+        ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c", 
Float.NaN));
+
+    assertEquals("Should return all expected rows", expected, sql("SELECT * 
FROM %s", tableName));
+  }
+
+  private static List<DataFile> currentDataFiles(Table table) {
+    return Streams.stream(table.newScan().planFiles())
+        .map(FileScanTask::file)
+        .collect(Collectors.toList());
+  }
+
+  @TestTemplate
+  public void testRefresh() {
+    catalog.initialize(catalogName, catalogConfig);
+    Table table = catalog.loadTable(tableIdent);
+
+    assertThat(currentDataFiles(table)).isNotEmpty();
+
+    sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f', 
float('NaN'))", tableName);
+
+    table.refresh();
+    assertThat(currentDataFiles(table)).isNotEmpty();
+  }
+
+  @TestTemplate
+  public void testInsertAndDelete() {
+    sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f', 
float('NaN'))", tableName);
+
+    List<Object[]> expected =
+        ImmutableList.of(
+            row(1L, "a", 1.0F),
+            row(2L, "b", 2.0F),
+            row(3L, "c", Float.NaN),
+            row(4L, "d", 4.0F),
+            row(5L, "e", 5.0F),
+            row(6L, "f", Float.NaN));
+
+    assertEquals(
+        "Should return all expected rows",
+        expected,
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    sql("DELETE FROM %s WHERE id < 4", tableName);
+
+    expected = ImmutableList.of(row(4L, "d", 4.0F), row(5L, "e", 5.0F), 
row(6L, "f", Float.NaN));
+
+    assertEquals(
+        "Should return all expected rows",
+        expected,
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @TestTemplate
+  public void testKeyDelete() {
+    assertThatThrownBy(
+            () -> sql("ALTER TABLE %s UNSET TBLPROPERTIES 
(`encryption.key-id`)", tableName))
+        .hasMessageContaining("Cannot remove key in encrypted table");
+  }
+
+  @TestTemplate
+  public void testDirectDataFileRead() {
+    List<Object[]> dataFileTable =
+        sql("SELECT file_path FROM %s.%s", tableName, 
MetadataTableType.ALL_DATA_FILES);
+    List<String> dataFiles =
+        Streams.concat(dataFileTable.stream())
+            .map(row -> (String) row[0])
+            .collect(Collectors.toList());
+
+    if (dataFiles.isEmpty()) {
+      throw new RuntimeException("No data files found for table " + tableName);
+    }
+
+    Schema schema = new Schema(optional(0, "id", Types.IntegerType.get()));
+    for (String filePath : dataFiles) {
+      assertThatThrownBy(
+              () ->
+                  Parquet.read(localInput(filePath))
+                      .project(schema)
+                      .callInit()
+                      .build()
+                      .iterator()
+                      .next())
+          .isInstanceOf(ParquetCryptoRuntimeException.class)
+          .hasMessageContaining("Trying to read file with encrypted footer. No 
keys available");
+    }
+  }
+
+  @TestTemplate
+  public void testManifestEncryption() throws IOException {
+    List<Object[]> manifestFileTable =
+        sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS);
+
+    List<String> manifestFiles =
+        Streams.concat(manifestFileTable.stream())
+            .map(row -> (String) row[0])
+            .collect(Collectors.toList());
+
+    if (manifestFiles.isEmpty()) {
+      throw new RuntimeException("No manifest files found for table " + 
tableName);
+    }
+
+    String metadataFolderPath = null;
+
+    // Check encryption of manifest files
+    for (String manifestFilePath : manifestFiles) {
+      checkMetadataFileEncryption(localInput(manifestFilePath));
+
+      if (metadataFolderPath == null) {
+        metadataFolderPath = new 
File(manifestFilePath).getParent().replaceFirst("file:", "");
+      }
+    }
+
+    if (metadataFolderPath == null) {
+      throw new RuntimeException("No metadata folder found for table " + 
tableName);
+    }
+
+    // Find manifest list and metadata files; check their encryption
+    File[] listOfMetadataFiles = new File(metadataFolderPath).listFiles();
+    boolean foundManifestListFile = false;
+
+    for (File metadataFile : listOfMetadataFiles) {
+      if (metadataFile.getName().startsWith("snap-")) {
+        foundManifestListFile = true;
+        checkMetadataFileEncryption(localInput(metadataFile));
+      }
+    }
+
+    if (!foundManifestListFile) {
+      throw new RuntimeException("No manifest list files found for table " + 
tableName);
+    }
+  }
+
+  private void checkMetadataFileEncryption(InputFile file) throws IOException {
+    SeekableInputStream stream = file.newStream();
+    byte[] magic = new byte[4];
+    stream.read(magic);
+    stream.close();
+    
assertThat(magic).isEqualTo(Ciphers.GCM_STREAM_MAGIC_STRING.getBytes(StandardCharsets.UTF_8));
+  }
+}


Reply via email to