This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bb50ab97d3 Core: Support Avro file encryption with AES GCM streams 
(#9436)
bb50ab97d3 is described below

commit bb50ab97d377476bfa3a743448e4afabe619e764
Author: ggershinsky <[email protected]>
AuthorDate: Tue Jan 16 19:55:06 2024 +0200

    Core: Support Avro file encryption with AES GCM streams (#9436)
---
 .../main/java/org/apache/iceberg/avro/Avro.java    |   9 -
 .../iceberg/avro/TestEncryptedAvroFileSplit.java   | 209 +++++++++++++++++++++
 .../iceberg/encryption/EncryptionTestHelpers.java  |  41 ++++
 .../org/apache/iceberg/encryption/UnitestKMS.java  |  38 ++++
 4 files changed, 288 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java 
b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 6aa3241c36..0eaa3f2d24 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -94,9 +94,6 @@ public class Avro {
   }
 
   public static WriteBuilder write(EncryptedOutputFile file) {
-    Preconditions.checkState(
-        file.keyMetadata() == null || file.keyMetadata() == 
EncryptionKeyMetadata.EMPTY,
-        "Avro encryption is not supported");
     return new WriteBuilder(file.encryptingOutputFile());
   }
 
@@ -282,9 +279,6 @@ public class Avro {
   }
 
   public static DataWriteBuilder writeData(EncryptedOutputFile file) {
-    Preconditions.checkState(
-        file.keyMetadata() == null || file.keyMetadata() == 
EncryptionKeyMetadata.EMPTY,
-        "Avro encryption is not supported");
     return new DataWriteBuilder(file.encryptingOutputFile());
   }
 
@@ -385,9 +379,6 @@ public class Avro {
   }
 
   public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
-    Preconditions.checkState(
-        file.keyMetadata() == null || file.keyMetadata() == 
EncryptionKeyMetadata.EMPTY,
-        "Avro encryption is not supported");
     return new DeleteWriteBuilder(file.encryptingOutputFile());
   }
 
diff --git 
a/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java 
b/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java
new file mode 100644
index 0000000000..9020a12302
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.EncryptionTestHelpers;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestEncryptedAvroFileSplit {
+  private static final Schema SCHEMA =
+      new Schema(
+          NestedField.required(1, "id", Types.LongType.get()),
+          NestedField.required(2, "data", Types.StringType.get()));
+
+  private static final EncryptionManager ENCRYPTION_MANAGER =
+      EncryptionTestHelpers.createEncryptionManager();
+
+  private static final int NUM_RECORDS = 100_000;
+
+  @TempDir Path temp;
+
+  public List<Record> expected = null;
+  public InputFile file = null;
+
+  @BeforeEach
+  public void writeDataFile() throws IOException {
+    this.expected = Lists.newArrayList();
+
+    OutputFile out = Files.localOutput(temp.toFile());
+
+    EncryptedOutputFile eOut = ENCRYPTION_MANAGER.encrypt(out);
+
+    try (FileAppender<Object> writer =
+        Avro.write(eOut)
+            .set(TableProperties.AVRO_COMPRESSION, "uncompressed")
+            .createWriterFunc(DataWriter::create)
+            .schema(SCHEMA)
+            .overwrite()
+            .build()) {
+
+      Record record = GenericRecord.create(SCHEMA);
+      for (long i = 0; i < NUM_RECORDS; i += 1) {
+        Record next = record.copy(ImmutableMap.of("id", i, "data", 
UUID.randomUUID().toString()));
+        expected.add(next);
+        writer.add(next);
+      }
+    }
+
+    EncryptedInputFile encryptedIn =
+        EncryptedFiles.encryptedInput(out.toInputFile(), eOut.keyMetadata());
+
+    this.file = ENCRYPTION_MANAGER.decrypt(encryptedIn);
+  }
+
+  @Test
+  public void testSplitDataSkipping() throws IOException {
+    long end = file.getLength();
+    long splitLocation = end / 2;
+
+    List<Record> firstHalf = readAvro(file, SCHEMA, 0, splitLocation);
+    assertThat(firstHalf.size()).as("First split should not be 
empty").isNotEqualTo(0);
+
+    List<Record> secondHalf = readAvro(file, SCHEMA, splitLocation + 1, end - 
splitLocation - 1);
+    assertThat(secondHalf.size()).as("Second split should not be 
empty").isNotEqualTo(0);
+
+    assertThat(firstHalf.size() + secondHalf.size())
+        .as("Total records should match expected")
+        .isEqualTo(expected.size());
+
+    for (int i = 0; i < firstHalf.size(); i += 1) {
+      assertThat(firstHalf.get(i)).isEqualTo(expected.get(i));
+    }
+
+    for (int i = 0; i < secondHalf.size(); i += 1) {
+      assertThat(secondHalf.get(i)).isEqualTo(expected.get(firstHalf.size() + 
i));
+    }
+  }
+
+  @Test
+  public void testPosField() throws IOException {
+    Schema projection =
+        new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION, 
SCHEMA.columns().get(1));
+
+    List<Record> records = readAvro(file, projection, 0, file.getLength());
+
+    for (int i = 0; i < expected.size(); i += 1) {
+      assertThat(records.get(i).getField(MetadataColumns.ROW_POSITION.name()))
+          .as("Field _pos should match")
+          .isEqualTo((long) i);
+
+      assertThat(records.get(i).getField("id"))
+          .as("Field id should match")
+          .isEqualTo(expected.get(i).getField("id"));
+
+      assertThat(records.get(i).getField("data"))
+          .as("Field data should match")
+          .isEqualTo(expected.get(i).getField("data"));
+    }
+  }
+
+  @Test
+  public void testPosFieldWithSplits() throws IOException {
+    Schema projection =
+        new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION, 
SCHEMA.columns().get(1));
+
+    long end = file.getLength();
+    long splitLocation = end / 2;
+
+    List<Record> secondHalf =
+        readAvro(file, projection, splitLocation + 1, end - splitLocation - 1);
+    assertThat(secondHalf.size()).as("Second split should not be 
empty").isNotEqualTo(0);
+
+    List<Record> firstHalf = readAvro(file, projection, 0, splitLocation);
+    assertThat(firstHalf.size()).as("First split should not be 
empty").isNotEqualTo(0);
+
+    assertThat(firstHalf.size() + secondHalf.size())
+        .as("Total records should match expected")
+        .isEqualTo(expected.size());
+
+    for (int i = 0; i < firstHalf.size(); i += 1) {
+      
assertThat(firstHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()))
+          .as("Field _pos should match")
+          .isEqualTo((long) i);
+      assertThat(firstHalf.get(i).getField("id"))
+          .as("Field id should match")
+          .isEqualTo(expected.get(i).getField("id"));
+      assertThat(firstHalf.get(i).getField("data"))
+          .as("Field data should match")
+          .isEqualTo(expected.get(i).getField("data"));
+    }
+
+    for (int i = 0; i < secondHalf.size(); i += 1) {
+      
assertThat(secondHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()))
+          .as("Field _pos should match")
+          .isEqualTo((long) (firstHalf.size() + i));
+      assertThat(secondHalf.get(i).getField("id"))
+          .as("Field id should match")
+          .isEqualTo(expected.get(firstHalf.size() + i).getField("id"));
+      assertThat(secondHalf.get(i).getField("data"))
+          .as("Field data should match")
+          .isEqualTo(expected.get(firstHalf.size() + i).getField("data"));
+    }
+  }
+
+  @Test
+  public void testPosWithEOFSplit() throws IOException {
+    Schema projection =
+        new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION, 
SCHEMA.columns().get(1));
+
+    long end = file.getLength();
+
+    List<Record> records = readAvro(file, projection, end - 10, 10);
+    assertThat(records.size()).as("Should not read any records").isEqualTo(0);
+  }
+
+  public List<Record> readAvro(InputFile in, Schema projection, long start, 
long length)
+      throws IOException {
+    try (AvroIterable<Record> reader =
+        Avro.read(in)
+            .createReaderFunc(DataReader::create)
+            .split(start, length)
+            .project(projection)
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java 
b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java
new file mode 100644
index 0000000000..aa49e1c40f
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.encryption;
+
+import java.util.Map;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class EncryptionTestHelpers {
+
+  private EncryptionTestHelpers() {}
+
+  public static EncryptionManager createEncryptionManager() {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    catalogProperties.put(
+        CatalogProperties.ENCRYPTION_KMS_IMPL, 
UnitestKMS.class.getCanonicalName());
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, 
UnitestKMS.MASTER_KEY_NAME1);
+    tableProperties.put(TableProperties.FORMAT_VERSION, "2");
+
+    return EncryptionUtil.createEncryptionManager(
+        tableProperties, EncryptionUtil.createKmsClient(catalogProperties));
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java 
b/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java
new file mode 100644
index 0000000000..52a0e36c00
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.encryption;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class UnitestKMS extends MemoryMockKMS {
+  public static final String MASTER_KEY_NAME1 = "keyA";
+  public static final byte[] MASTER_KEY1 = 
"0123456789012345".getBytes(StandardCharsets.UTF_8);
+  public static final String MASTER_KEY_NAME2 = "keyB";
+  public static final byte[] MASTER_KEY2 = 
"1123456789012345".getBytes(StandardCharsets.UTF_8);
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    masterKeys =
+        ImmutableMap.of(
+            MASTER_KEY_NAME1, MASTER_KEY1,
+            MASTER_KEY_NAME2, MASTER_KEY2);
+  }
+}

Reply via email to