This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 211f5d550b Spark 3.5: Support encrypted output files (#9435)
211f5d550b is described below
commit 211f5d550b1d505d0c2da1e190551919448e0605
Author: ggershinsky <[email protected]>
AuthorDate: Fri Jan 12 02:20:50 2024 +0200
Spark 3.5: Support encrypted output files (#9435)
---
.../iceberg/spark/source/SparkAppenderFactory.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
index 6372edde07..9df12fc060 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.DeleteSchemaUtil;
@@ -162,6 +163,11 @@ class SparkAppenderFactory implements
FileAppenderFactory<InternalRow> {
@Override
public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat
fileFormat) {
+ return newAppender(EncryptionUtil.plainAsEncryptedOutput(file),
fileFormat);
+ }
+
+ @Override
+ public FileAppender<InternalRow> newAppender(EncryptedOutputFile file,
FileFormat fileFormat) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
try {
switch (fileFormat) {
@@ -203,7 +209,7 @@ class SparkAppenderFactory implements
FileAppenderFactory<InternalRow> {
public DataWriter<InternalRow> newDataWriter(
EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new DataWriter<>(
- newAppender(file.encryptingOutputFile(), format),
+ newAppender(file, format),
format,
file.encryptingOutputFile().location(),
spec,
@@ -224,7 +230,7 @@ class SparkAppenderFactory implements
FileAppenderFactory<InternalRow> {
try {
switch (format) {
case PARQUET:
- return Parquet.writeDeletes(file.encryptingOutputFile())
+ return Parquet.writeDeletes(file)
.createWriterFunc(
msgType ->
SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType))
.overwrite()
@@ -236,7 +242,7 @@ class SparkAppenderFactory implements
FileAppenderFactory<InternalRow> {
.buildEqualityWriter();
case AVRO:
- return Avro.writeDeletes(file.encryptingOutputFile())
+ return Avro.writeDeletes(file)
.createWriterFunc(ignored -> new
SparkAvroWriter(lazyEqDeleteSparkType()))
.overwrite()
.rowSchema(eqDeleteRowSchema)
@@ -247,7 +253,7 @@ class SparkAppenderFactory implements
FileAppenderFactory<InternalRow> {
.buildEqualityWriter();
case ORC:
- return ORC.writeDeletes(file.encryptingOutputFile())
+ return ORC.writeDeletes(file)
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(eqDeleteRowSchema)
@@ -274,7 +280,7 @@ class SparkAppenderFactory implements
FileAppenderFactory<InternalRow> {
case PARQUET:
StructType sparkPosDeleteSchema =
SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
- return Parquet.writeDeletes(file.encryptingOutputFile())
+ return Parquet.writeDeletes(file)
.createWriterFunc(
msgType ->
SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType))
.overwrite()
@@ -286,7 +292,7 @@ class SparkAppenderFactory implements
FileAppenderFactory<InternalRow> {
.buildPositionWriter();
case AVRO:
- return Avro.writeDeletes(file.encryptingOutputFile())
+ return Avro.writeDeletes(file)
.createWriterFunc(ignored -> new
SparkAvroWriter(lazyPosDeleteSparkType()))
.overwrite()
.rowSchema(posDeleteRowSchema)
@@ -296,7 +302,7 @@ class SparkAppenderFactory implements
FileAppenderFactory<InternalRow> {
.buildPositionWriter();
case ORC:
- return ORC.writeDeletes(file.encryptingOutputFile())
+ return ORC.writeDeletes(file)
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(posDeleteRowSchema)