huaxingao commented on code in PR #11257:
URL: https://github.com/apache/iceberg/pull/11257#discussion_r1794322243
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java:
##########
@@ -108,14 +108,14 @@ public static Object[][] parameters() {
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
PARQUET,
- ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")
Review Comment:
The reason I wanted to `ImmutableMap.of(COMPRESSION_CODEC, "zstd",
COMPRESSION_LEVEL, "1")` after gzip is that the new Hadoop version uses
`CompressionLevel` to initialize a GzipCompressor, and this COMPRESSION_LEVEL,
"1", is carried over to gzip. However, "1" is not a valid compression level for
gzip, so it throws an exception.
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task
1 in stage 6.0 failed 1 times, most recent failure: Lost task 1.0 in stage 6.0
(TID 7) (192.168.50.141 executor driver): java.lang.IllegalArgumentException:
No enum constant
org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel.1
at java.base/java.lang.Enum.valueOf(Enum.java:273)
at org.apache.hadoop.conf.Configuration.getEnum(Configuration.java:1786)
at
org.apache.hadoop.io.compress.zlib.ZlibFactory.getCompressionLevel(ZlibFactory.java:165)
at
org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor.init(BuiltInGzipCompressor.java:157)
at
org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor.<init>(BuiltInGzipCompressor.java:67)
at
org.apache.hadoop.io.compress.GzipCodec.createCompressor(GzipCodec.java:64)
at
org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:152)
at
org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
at
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:157)
at
org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219)
at
org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202)
at
org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90)
at
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:360)
```
I think this over; rather than switching the order, it's better to unset the
COMPRESSION_CODEC and COMPRESSION_LEVEL for each test.
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java:
##########
@@ -302,4 +303,12 @@ private <T> MemoryStream<T> newMemoryStream(int id,
SQLContext sqlContext, Encod
private <T> void send(List<T> records, MemoryStream<T> stream) {
stream.addData(JavaConverters.asScalaBuffer(records));
}
+
+ private void deleteFileAndCrc(File file) throws IOException {
+ File crcFile = new File(file.getParent(), "." + file.getName() + ".crc");
Review Comment:
file.getAbsolutePath + ".crc" has
```
/var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit3203001525697015981/parquet/checkpoint/commits/1.crc
```
but I need
```
/var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit3203001525697015981/parquet/checkpoint/commits/.1.crc
```
When deleting the commits like
```
// remove the last commit to force Spark to reprocess batch #1
File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
assertThat(lastCommitFile.delete()).as("The commit file must be
deleted").isTrue();
```
I got Exception so I manually delete the crc file
```
Caused by:
org.apache.hadoop.fs.FileAlreadyExistsException: Rename
destination
file:/var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit16535101756839742517/parquet/checkpoint/commits/.1.crc
already exists.
at
app//org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:876)
at
app//org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:807)
at
app//org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:519)
at
app//org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:807)
at
app//org.apache.hadoop.fs.FileContext.rename(FileContext.java:1044)
at
app//org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:376)
at
app//org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:156)
at
app//org.apache.spark.sql.execution.streaming.HDFSMetadataLog.write(HDFSMetadataLog.scala:207)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]