This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 211f5d550b Spark 3.5: Support encrypted output files (#9435)
211f5d550b is described below

commit 211f5d550b1d505d0c2da1e190551919448e0605
Author: ggershinsky <[email protected]>
AuthorDate: Fri Jan 12 02:20:50 2024 +0200

    Spark 3.5: Support encrypted output files (#9435)
---
 .../iceberg/spark/source/SparkAppenderFactory.java   | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
index 6372edde07..9df12fc060 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.deletes.EqualityDeleteWriter;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionUtil;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.DeleteSchemaUtil;
@@ -162,6 +163,11 @@ class SparkAppenderFactory implements 
FileAppenderFactory<InternalRow> {
 
   @Override
   public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat 
fileFormat) {
+    return newAppender(EncryptionUtil.plainAsEncryptedOutput(file), 
fileFormat);
+  }
+
+  @Override
+  public FileAppender<InternalRow> newAppender(EncryptedOutputFile file, 
FileFormat fileFormat) {
     MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
     try {
       switch (fileFormat) {
@@ -203,7 +209,7 @@ class SparkAppenderFactory implements 
FileAppenderFactory<InternalRow> {
   public DataWriter<InternalRow> newDataWriter(
       EncryptedOutputFile file, FileFormat format, StructLike partition) {
     return new DataWriter<>(
-        newAppender(file.encryptingOutputFile(), format),
+        newAppender(file, format),
         format,
         file.encryptingOutputFile().location(),
         spec,
@@ -224,7 +230,7 @@ class SparkAppenderFactory implements 
FileAppenderFactory<InternalRow> {
     try {
       switch (format) {
         case PARQUET:
-          return Parquet.writeDeletes(file.encryptingOutputFile())
+          return Parquet.writeDeletes(file)
               .createWriterFunc(
                   msgType -> 
SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType))
               .overwrite()
@@ -236,7 +242,7 @@ class SparkAppenderFactory implements 
FileAppenderFactory<InternalRow> {
               .buildEqualityWriter();
 
         case AVRO:
-          return Avro.writeDeletes(file.encryptingOutputFile())
+          return Avro.writeDeletes(file)
               .createWriterFunc(ignored -> new 
SparkAvroWriter(lazyEqDeleteSparkType()))
               .overwrite()
               .rowSchema(eqDeleteRowSchema)
@@ -247,7 +253,7 @@ class SparkAppenderFactory implements 
FileAppenderFactory<InternalRow> {
               .buildEqualityWriter();
 
         case ORC:
-          return ORC.writeDeletes(file.encryptingOutputFile())
+          return ORC.writeDeletes(file)
               .createWriterFunc(SparkOrcWriter::new)
               .overwrite()
               .rowSchema(eqDeleteRowSchema)
@@ -274,7 +280,7 @@ class SparkAppenderFactory implements 
FileAppenderFactory<InternalRow> {
         case PARQUET:
           StructType sparkPosDeleteSchema =
               
SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
-          return Parquet.writeDeletes(file.encryptingOutputFile())
+          return Parquet.writeDeletes(file)
               .createWriterFunc(
                   msgType -> 
SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType))
               .overwrite()
@@ -286,7 +292,7 @@ class SparkAppenderFactory implements 
FileAppenderFactory<InternalRow> {
               .buildPositionWriter();
 
         case AVRO:
-          return Avro.writeDeletes(file.encryptingOutputFile())
+          return Avro.writeDeletes(file)
               .createWriterFunc(ignored -> new 
SparkAvroWriter(lazyPosDeleteSparkType()))
               .overwrite()
               .rowSchema(posDeleteRowSchema)
@@ -296,7 +302,7 @@ class SparkAppenderFactory implements 
FileAppenderFactory<InternalRow> {
               .buildPositionWriter();
 
         case ORC:
-          return ORC.writeDeletes(file.encryptingOutputFile())
+          return ORC.writeDeletes(file)
               .createWriterFunc(SparkOrcWriter::new)
               .overwrite()
               .rowSchema(posDeleteRowSchema)

Reply via email to