This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 30ca573fe4 Encryption integration and test (#13066)
30ca573fe4 is described below
commit 30ca573fe4a7fbbe86a7ca8bd2c0fcab5c025c13
Author: ggershinsky <[email protected]>
AuthorDate: Tue Oct 21 04:39:10 2025 +0300
Encryption integration and test (#13066)
* initial commit
* add missing parts
* Address review comments
Co-Authored-By: Sreesh Maheshwar
<[email protected]>
* fix style
---------
Co-authored-by: Sreesh Maheshwar
<[email protected]>
---
.../iceberg/BaseMetastoreTableOperations.java | 2 +-
.../apache/iceberg/encryption/EncryptionUtil.java | 12 +-
.../encryption/StandardEncryptionManager.java | 38 +++-
.../java/org/apache/iceberg/hive/HiveCatalog.java | 20 +-
.../apache/iceberg/hive/HiveTableOperations.java | 190 ++++++++++++++++-
.../apache/iceberg/hive/TestHiveCommitLocks.java | 2 +
.../iceberg/spark/sql/TestCTASEncryption.java | 120 +++++++++++
.../iceberg/spark/sql/TestTableEncryption.java | 232 +++++++++++++++++++++
8 files changed, 606 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index 9fa52d52ea..bda983a6c1 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -214,7 +214,7 @@ public abstract class BaseMetastoreTableOperations extends
BaseMetastoreOperatio
this.shouldRefresh = false;
}
- private String metadataFileLocation(TableMetadata metadata, String filename)
{
+ protected String metadataFileLocation(TableMetadata metadata, String
filename) {
String metadataLocation =
metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);
if (metadataLocation != null) {
diff --git
a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
index 1b35cb82f0..51b73cc43d 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
@@ -75,7 +75,7 @@ public class EncryptionUtil {
return kmsClient;
}
- static EncryptionManager createEncryptionManager(
+ public static EncryptionManager createEncryptionManager(
List<EncryptedKey> keys, Map<String, String> tableProperties,
KeyManagementClient kmsClient) {
Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null");
String tableKeyId =
tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY);
@@ -96,7 +96,7 @@ public class EncryptionUtil {
"Invalid data key length: %s (must be 16, 24, or 32)",
dataKeyLength);
- return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient);
+ return new StandardEncryptionManager(keys, tableKeyId, dataKeyLength,
kmsClient);
}
public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile
encryptingOutputFile) {
@@ -128,6 +128,14 @@ public class EncryptionUtil {
return ByteBuffer.wrap(decryptedKeyMetadata);
}
+ public static Map<String, EncryptedKey> encryptionKeys(EncryptionManager em)
{
+ Preconditions.checkState(
+ em instanceof StandardEncryptionManager,
+ "Retrieving encryption keys requires a StandardEncryptionManager");
+ StandardEncryptionManager sem = (StandardEncryptionManager) em;
+ return sem.encryptionKeys();
+ }
+
/**
* Encrypts the key metadata for a manifest list.
*
diff --git
a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
index d000221bb6..b68cf4ed67 100644
---
a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
+++
b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
@@ -23,6 +23,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.Base64;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.TableProperties;
@@ -46,9 +47,19 @@ public class StandardEncryptionManager implements
EncryptionManager {
private final Map<String, EncryptedKey> encryptionKeys;
private final LoadingCache<String, ByteBuffer> unwrappedKeyCache;
- private TransientEncryptionState(KeyManagementClient kmsClient) {
+ private TransientEncryptionState(KeyManagementClient kmsClient,
List<EncryptedKey> keys) {
this.kmsClient = kmsClient;
this.encryptionKeys = Maps.newLinkedHashMap();
+
+ if (keys != null) {
+ for (EncryptedKey key : keys) {
+ encryptionKeys.put(
+ key.keyId(),
+ new BaseEncryptedKey(
+ key.keyId(), key.encryptedKeyMetadata(),
key.encryptedById(), key.properties()));
+ }
+ }
+
this.unwrappedKeyCache =
Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
@@ -64,12 +75,25 @@ public class StandardEncryptionManager implements
EncryptionManager {
private transient volatile SecureRandom lazyRNG = null;
/**
+ * @deprecated will be removed in 2.0.
+ */
+ @Deprecated
+ public StandardEncryptionManager(
+ String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) {
+ this(List.of(), tableKeyId, dataKeyLength, kmsClient);
+ }
+
+ /**
+ * @param keys encryption keys from table metadata
* @param tableKeyId table encryption key id
* @param dataKeyLength length of data encryption key (16/24/32 bytes)
* @param kmsClient Client of KMS used to wrap/unwrap keys in envelope
encryption
*/
public StandardEncryptionManager(
- String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) {
+ List<EncryptedKey> keys,
+ String tableKeyId,
+ int dataKeyLength,
+ KeyManagementClient kmsClient) {
Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null");
Preconditions.checkArgument(
dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32,
@@ -77,7 +101,7 @@ public class StandardEncryptionManager implements
EncryptionManager {
dataKeyLength);
Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null");
this.tableKeyId = tableKeyId;
- this.transientState = new TransientEncryptionState(kmsClient);
+ this.transientState = new TransientEncryptionState(kmsClient, keys);
this.dataKeyLength = dataKeyLength;
}
@@ -134,6 +158,14 @@ public class StandardEncryptionManager implements
EncryptionManager {
return transientState.kmsClient.unwrapKey(wrappedSecretKey, tableKeyId);
}
+ Map<String, EncryptedKey> encryptionKeys() {
+ if (transientState == null) {
+ throw new IllegalStateException("Cannot return the encryption keys after
serialization");
+ }
+
+ return transientState.encryptionKeys;
+ }
+
private String keyEncryptionKeyID() {
if (transientState == null) {
throw new IllegalStateException("Cannot return the current key after
serialization");
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 42a74f17f1..5e7a249af8 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.hive;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,6 +46,8 @@ import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.encryption.EncryptionUtil;
+import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
@@ -88,6 +91,7 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
private String name;
private Configuration conf;
private FileIO fileIO;
+ private KeyManagementClient keyManagementClient;
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
private Map<String, String> catalogProperties;
@@ -122,6 +126,10 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
? new HadoopFileIO(conf)
: CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
+ if (catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) {
+ this.keyManagementClient = EncryptionUtil.createKmsClient(properties);
+ }
+
this.clients = new CachedClientPool(conf, properties);
}
@@ -686,7 +694,8 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
public TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
- return new HiveTableOperations(conf, clients, fileIO, name, dbName,
tableName);
+ return new HiveTableOperations(
+ conf, clients, fileIO, keyManagementClient, name, dbName, tableName);
}
@Override
@@ -816,6 +825,15 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ if (keyManagementClient != null) {
+ keyManagementClient.close();
+ }
+ }
+
@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 3e4be78169..c2fb656cc3 100644
---
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.hive;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -32,8 +34,17 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.BaseMetastoreOperations;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptedKey;
+import org.apache.iceberg.encryption.EncryptingFileIO;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.EncryptionUtil;
+import org.apache.iceberg.encryption.KeyManagementClient;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.encryption.StandardEncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
@@ -41,7 +52,9 @@ import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,18 +79,27 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
private final FileIO fileIO;
+ private final KeyManagementClient keyManagementClient;
private final ClientPool<IMetaStoreClient, TException> metaClients;
+ private EncryptionManager encryptionManager;
+ private EncryptingFileIO encryptingFileIO;
+ private String tableKeyId;
+ private int encryptionDekLength;
+ private List<EncryptedKey> encryptedKeysFromMetadata;
+
protected HiveTableOperations(
Configuration conf,
ClientPool<IMetaStoreClient, TException> metaClients,
FileIO fileIO,
+ KeyManagementClient keyManagementClient,
String catalogName,
String database,
String table) {
this.conf = conf;
this.metaClients = metaClients;
this.fileIO = fileIO;
+ this.keyManagementClient = keyManagementClient;
this.fullName = catalogName + "." + database + "." + table;
this.catalogName = catalogName;
this.database = database;
@@ -97,12 +119,48 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
@Override
public FileIO io() {
- return fileIO;
+ if (tableKeyId == null) {
+ return fileIO;
+ }
+
+ if (encryptingFileIO == null) {
+ encryptingFileIO = EncryptingFileIO.combine(fileIO, encryption());
+ }
+
+ return encryptingFileIO;
+ }
+
+ @Override
+ public EncryptionManager encryption() {
+ if (encryptionManager != null) {
+ return encryptionManager;
+ }
+
+ if (tableKeyId != null) {
+ if (keyManagementClient == null) {
+ throw new RuntimeException(
+ "Cant create encryption manager, because key management client is
not set");
+ }
+
+ Map<String, String> encryptionProperties = Maps.newHashMap();
+ encryptionProperties.put(TableProperties.ENCRYPTION_TABLE_KEY,
tableKeyId);
+ encryptionProperties.put(
+ TableProperties.ENCRYPTION_DEK_LENGTH,
String.valueOf(encryptionDekLength));
+ encryptionManager =
+ EncryptionUtil.createEncryptionManager(
+ encryptedKeysFromMetadata, encryptionProperties,
keyManagementClient);
+ } else {
+ return PlaintextEncryptionManager.instance();
+ }
+
+ return encryptionManager;
}
@Override
protected void doRefresh() {
String metadataLocation = null;
+ String tableKeyIdFromHMS = null;
+ String dekLengthFromHMS = null;
try {
Table table = metaClients.run(client -> client.getTable(database,
tableName));
@@ -112,7 +170,12 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
HiveOperationsBase.validateTableIsIceberg(table, fullName);
metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
-
+ /* Table key ID must be retrieved from a catalog service, and not from
untrusted storage
+ (e.g. metadata json file) that can be tampered with. For example, an
attacker can remove
+ the table key parameter (along with existing snapshots) in the file,
making the writers
+ produce unencrypted files. Table key ID is taken directly from HMS
catalog */
+ tableKeyIdFromHMS =
table.getParameters().get(TableProperties.ENCRYPTION_TABLE_KEY);
+ dekLengthFromHMS =
table.getParameters().get(TableProperties.ENCRYPTION_DEK_LENGTH);
} catch (NoSuchObjectException e) {
if (currentMetadataLocation() != null) {
throw new NoSuchTableException("No such table: %s.%s", database,
tableName);
@@ -129,13 +192,44 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
}
refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
+
+ if (tableKeyIdFromHMS != null) {
+ checkEncryptionProperties(tableKeyIdFromHMS, dekLengthFromHMS);
+
+ tableKeyId = tableKeyIdFromHMS;
+ encryptionDekLength =
+ (dekLengthFromHMS != null)
+ ? Integer.parseInt(dekLengthFromHMS)
+ : TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT;
+
+ encryptedKeysFromMetadata = current().encryptionKeys();
+ // Force re-creation of encryption manager with updated keys
+ encryptingFileIO = null;
+ encryptionManager = null;
+ }
}
@SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean newTable = base == null;
- String newMetadataLocation = writeNewMetadataIfRequired(newTable,
metadata);
+ encryptionPropsFromMetadata(metadata.properties());
+
+ String newMetadataLocation;
+ EncryptionManager encrManager = encryption();
+ if (encrManager instanceof StandardEncryptionManager) {
+ // Add new encryption keys to the metadata
+ TableMetadata.Builder builder = TableMetadata.buildFrom(metadata);
+ for (Map.Entry<String, EncryptedKey> entry :
+ EncryptionUtil.encryptionKeys(encrManager).entrySet()) {
+ builder.addEncryptionKey(entry.getValue());
+ }
+
+ newMetadataLocation = writeNewMetadataIfRequired(newTable,
builder.build());
+ } else {
+ newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
+ }
+
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS,
false);
@@ -194,6 +288,10 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
.collect(Collectors.toSet());
}
+ if (removedProps.contains(TableProperties.ENCRYPTION_TABLE_KEY)) {
+ throw new RuntimeException("Cannot remove key in encrypted table");
+ }
+
HMSTablePropertyHelper.updateHmsTableForIcebergTable(
newMetadataLocation,
tbl,
@@ -321,6 +419,54 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
return metaClients;
}
+ @Override
+ public TableOperations temp(TableMetadata uncommittedMetadata) {
+ return new TableOperations() {
+ @Override
+ public TableMetadata current() {
+ return uncommittedMetadata;
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ throw new UnsupportedOperationException(
+ "Cannot call refresh on temporary table operations");
+ }
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata metadata) {
+ throw new UnsupportedOperationException("Cannot call commit on
temporary table operations");
+ }
+
+ @Override
+ public String metadataFileLocation(String fileName) {
+ return
HiveTableOperations.this.metadataFileLocation(uncommittedMetadata, fileName);
+ }
+
+ @Override
+ public LocationProvider locationProvider() {
+ return LocationProviders.locationsFor(
+ uncommittedMetadata.location(), uncommittedMetadata.properties());
+ }
+
+ @Override
+ public FileIO io() {
+
HiveTableOperations.this.encryptionPropsFromMetadata(uncommittedMetadata.properties());
+ return HiveTableOperations.this.io();
+ }
+
+ @Override
+ public EncryptionManager encryption() {
+ return HiveTableOperations.this.encryption();
+ }
+
+ @Override
+ public long newSnapshotId() {
+ return HiveTableOperations.this.newSnapshotId();
+ }
+ };
+ }
+
/**
* Returns if the hive engine related values should be enabled on the table,
or not.
*
@@ -375,6 +521,44 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
ConfigProperties.LOCK_HIVE_ENABLED,
TableProperties.HIVE_LOCK_ENABLED_DEFAULT);
}
+ private void encryptionPropsFromMetadata(Map<String, String>
tableProperties) {
+ if (tableKeyId == null) {
+ tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY);
+ }
+
+ if (tableKeyId != null && encryptionDekLength <= 0) {
+ String dekLength =
tableProperties.get(TableProperties.ENCRYPTION_DEK_LENGTH);
+ encryptionDekLength =
+ (dekLength == null)
+ ? TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT
+ : Integer.parseInt(dekLength);
+ }
+ }
+
+ private void checkEncryptionProperties(String encryptionKeyIdFromHMS, String
dekLengthFromHMS) {
+ Map<String, String> propertiesFromMetadata = current().properties();
+
+ String encryptionKeyIdFromMetadata =
+ propertiesFromMetadata.get(TableProperties.ENCRYPTION_TABLE_KEY);
+ if (!Objects.equals(encryptionKeyIdFromHMS, encryptionKeyIdFromMetadata)) {
+ String errMsg =
+ String.format(
+ "Metadata file might have been modified. Encryption key id %s
differs from HMS value %s",
+ encryptionKeyIdFromMetadata, encryptionKeyIdFromHMS);
+ throw new RuntimeException(errMsg);
+ }
+
+ String dekLengthFromMetadata =
+ propertiesFromMetadata.get(TableProperties.ENCRYPTION_DEK_LENGTH);
+ if (!Objects.equals(dekLengthFromHMS, dekLengthFromMetadata)) {
+ String errMsg =
+ String.format(
+ "Metadata file might have been modified. DEK length %s differs
from HMS value %s",
+ dekLengthFromMetadata, dekLengthFromHMS);
+ throw new RuntimeException(errMsg);
+ }
+ }
+
@VisibleForTesting
HiveLock lockObject(TableMetadata metadata) {
if (hiveLockEnabled(metadata, conf)) {
diff --git
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 0ffcb05709..e62ce9aeae 100644
---
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -185,6 +185,7 @@ public class TestHiveCommitLocks {
overriddenHiveConf,
spyCachedClientPool,
ops.io(),
+ null,
catalog.name(),
dbName,
tableName));
@@ -615,6 +616,7 @@ public class TestHiveCommitLocks {
confWithLock,
spyCachedClientPool,
ops.io(),
+ null,
catalog.name(),
TABLE_IDENTIFIER.namespace().level(0),
TABLE_IDENTIFIER.name()));
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java
new file mode 100644
index 0000000000..6094ab0ccc
--- /dev/null
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.UnitestKMS;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestCTASEncryption extends CatalogTestBase {
+ private static Map<String, String>
appendCatalogEncryptionProperties(Map<String, String> props) {
+ Map<String, String> newProps = Maps.newHashMap();
+ newProps.putAll(props);
+ newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL,
UnitestKMS.class.getCanonicalName());
+ return newProps;
+ }
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+ protected static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties())
+ }
+ };
+ }
+
+ @BeforeEach
+ public void createTables() {
+ sql("CREATE TABLE %s (id bigint, data string, float float) USING iceberg
", tableName + "1");
+ sql(
+ "INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c',
float('NaN'))",
+ tableName + "1");
+
+ sql(
+ "CREATE TABLE %s USING iceberg "
+ + "TBLPROPERTIES ( "
+ + "'encryption.key-id'='%s')"
+ + " AS SELECT * from %s",
+ tableName, UnitestKMS.MASTER_KEY_NAME1, tableName + "1");
+ }
+
+ @AfterEach
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s", tableName + "1");
+ }
+
+ @TestTemplate
+ public void testSelect() {
+ List<Object[]> expected =
+ ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c",
Float.NaN));
+
+ assertEquals("Should return all expected rows", expected, sql("SELECT *
FROM %s", tableName));
+ }
+
+ @TestTemplate
+ public void testDirectDataFileRead() {
+ List<Object[]> dataFileTable =
+ sql("SELECT file_path FROM %s.%s", tableName,
MetadataTableType.ALL_DATA_FILES);
+ List<String> dataFiles =
+ Streams.concat(dataFileTable.stream())
+ .map(row -> (String) row[0])
+ .collect(Collectors.toList());
+
+ if (dataFiles.isEmpty()) {
+ throw new RuntimeException("No data files found for table " + tableName);
+ }
+
+ Schema schema = new Schema(optional(0, "id", Types.IntegerType.get()));
+ for (String filePath : dataFiles) {
+ assertThatThrownBy(
+ () ->
+ Parquet.read(localInput(filePath))
+ .project(schema)
+ .callInit()
+ .build()
+ .iterator()
+ .next())
+ .isInstanceOf(ParquetCryptoRuntimeException.class)
+ .hasMessageContaining("Trying to read file with encrypted footer. No
keys available");
+ }
+ }
+}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
new file mode 100644
index 0000000000..695a41681b
--- /dev/null
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.Ciphers;
+import org.apache.iceberg.encryption.UnitestKMS;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestTableEncryption extends CatalogTestBase {
+ private static Map<String, String>
appendCatalogEncryptionProperties(Map<String, String> props) {
+ Map<String, String> newProps = Maps.newHashMap();
+ newProps.putAll(props);
+ newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL,
UnitestKMS.class.getCanonicalName());
+ return newProps;
+ }
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+ protected static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties())
+ }
+ };
+ }
+
+ @BeforeEach
+ public void createTables() {
+ sql(
+ "CREATE TABLE %s (id bigint, data string, float float) USING iceberg "
+ + "TBLPROPERTIES ( "
+ + "'encryption.key-id'='%s')",
+ tableName, UnitestKMS.MASTER_KEY_NAME1);
+
+ sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c',
float('NaN'))", tableName);
+ }
+
+ @AfterEach
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void testSelect() {
+ List<Object[]> expected =
+ ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c",
Float.NaN));
+
+ assertEquals("Should return all expected rows", expected, sql("SELECT *
FROM %s", tableName));
+ }
+
+ private static List<DataFile> currentDataFiles(Table table) {
+ return Streams.stream(table.newScan().planFiles())
+ .map(FileScanTask::file)
+ .collect(Collectors.toList());
+ }
+
+ @TestTemplate
+ public void testRefresh() {
+ catalog.initialize(catalogName, catalogConfig);
+ Table table = catalog.loadTable(tableIdent);
+
+ assertThat(currentDataFiles(table)).isNotEmpty();
+
+ sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f',
float('NaN'))", tableName);
+
+ table.refresh();
+ assertThat(currentDataFiles(table)).isNotEmpty();
+ }
+
+ @TestTemplate
+ public void testInsertAndDelete() {
+ sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f',
float('NaN'))", tableName);
+
+ List<Object[]> expected =
+ ImmutableList.of(
+ row(1L, "a", 1.0F),
+ row(2L, "b", 2.0F),
+ row(3L, "c", Float.NaN),
+ row(4L, "d", 4.0F),
+ row(5L, "e", 5.0F),
+ row(6L, "f", Float.NaN));
+
+ assertEquals(
+ "Should return all expected rows",
+ expected,
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ sql("DELETE FROM %s WHERE id < 4", tableName);
+
+ expected = ImmutableList.of(row(4L, "d", 4.0F), row(5L, "e", 5.0F),
row(6L, "f", Float.NaN));
+
+ assertEquals(
+ "Should return all expected rows",
+ expected,
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @TestTemplate
+ public void testKeyDelete() {
+ assertThatThrownBy(
+ () -> sql("ALTER TABLE %s UNSET TBLPROPERTIES
(`encryption.key-id`)", tableName))
+ .hasMessageContaining("Cannot remove key in encrypted table");
+ }
+
+ @TestTemplate
+ public void testDirectDataFileRead() {
+ List<Object[]> dataFileTable =
+ sql("SELECT file_path FROM %s.%s", tableName,
MetadataTableType.ALL_DATA_FILES);
+ List<String> dataFiles =
+ Streams.concat(dataFileTable.stream())
+ .map(row -> (String) row[0])
+ .collect(Collectors.toList());
+
+ if (dataFiles.isEmpty()) {
+ throw new RuntimeException("No data files found for table " + tableName);
+ }
+
+ Schema schema = new Schema(optional(0, "id", Types.IntegerType.get()));
+ for (String filePath : dataFiles) {
+ assertThatThrownBy(
+ () ->
+ Parquet.read(localInput(filePath))
+ .project(schema)
+ .callInit()
+ .build()
+ .iterator()
+ .next())
+ .isInstanceOf(ParquetCryptoRuntimeException.class)
+ .hasMessageContaining("Trying to read file with encrypted footer. No
keys available");
+ }
+ }
+
+ @TestTemplate
+ public void testManifestEncryption() throws IOException {
+ List<Object[]> manifestFileTable =
+ sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS);
+
+ List<String> manifestFiles =
+ Streams.concat(manifestFileTable.stream())
+ .map(row -> (String) row[0])
+ .collect(Collectors.toList());
+
+ if (manifestFiles.isEmpty()) {
+ throw new RuntimeException("No manifest files found for table " +
tableName);
+ }
+
+ String metadataFolderPath = null;
+
+ // Check encryption of manifest files
+ for (String manifestFilePath : manifestFiles) {
+ checkMetadataFileEncryption(localInput(manifestFilePath));
+
+ if (metadataFolderPath == null) {
+ metadataFolderPath = new
File(manifestFilePath).getParent().replaceFirst("file:", "");
+ }
+ }
+
+ if (metadataFolderPath == null) {
+ throw new RuntimeException("No metadata folder found for table " +
tableName);
+ }
+
+ // Find manifest list and metadata files; check their encryption
+ File[] listOfMetadataFiles = new File(metadataFolderPath).listFiles();
+ boolean foundManifestListFile = false;
+
+ for (File metadataFile : listOfMetadataFiles) {
+ if (metadataFile.getName().startsWith("snap-")) {
+ foundManifestListFile = true;
+ checkMetadataFileEncryption(localInput(metadataFile));
+ }
+ }
+
+ if (!foundManifestListFile) {
+ throw new RuntimeException("No manifest list files found for table " +
tableName);
+ }
+ }
+
+ private void checkMetadataFileEncryption(InputFile file) throws IOException {
+ SeekableInputStream stream = file.newStream();
+ byte[] magic = new byte[4];
+ stream.read(magic);
+ stream.close();
+
assertThat(magic).isEqualTo(Ciphers.GCM_STREAM_MAGIC_STRING.getBytes(StandardCharsets.UTF_8));
+ }
+}