mengna-lin opened a new pull request, #16094:
URL: https://github.com/apache/iceberg/pull/16094
Closes #16090
Previously all columns in a Parquet file were forced to use the same codec.
This PR enables parquet per column compression.
Changes
1. Two new table property prefixes
(write.parquet.compression-codec.column.<col> and
write.parquet.compression-level.column.<col>) — columns without an
override fall back to the global codec.
2. ParquetWriter now holds a CompressionCodecFactory + default codec
instead of a pre-resolved single
BytesInputCompressor, and passes them to the new
ColumnChunkPageWriteStore.
3. setColumnCompressionConfig in WriteBuilder maps Iceberg column names to
Parquet dot-paths and calls
withCompressionCodec/withCompressionLevel on ParquetProperties.Builder,
wiring the table properties into the
Parquet write configuration.
Test with a spark Job
```Scala
/**
* Runnable Spark job to manually verify per-column Parquet compression.
*
* <p>Creates a temporary Iceberg table with:
* <ul>
* <li>Global codec: zstd
* <li>Per-column override for {@code int_col}: snappy
* </ul>
* Writes a few rows, then reads the Parquet footer and prints each column's
actual codec.
*/
public class PerColumnCompressionMain {
public static void main(String[] args) throws Exception {
Path warehouse = Files.createTempDirectory("iceberg-warehouse");
SparkSession spark =
SparkSession.builder()
.master("local[2]")
.appName("PerColumnCompressionMain")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.local",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse",
warehouse.toAbsolutePath().toString())
.getOrCreate();
try {
spark.sql(
"CREATE TABLE local.default.test_per_col ("
+ " int_col int,"
+ " string_col string"
+ ") USING iceberg"
+ " TBLPROPERTIES ("
+ " 'write.parquet.compression-codec' = 'zstd',"
+ " 'write.parquet.compression-codec.column.int_col' =
'snappy'"
+ ")");
spark.sql(
"INSERT INTO local.default.test_per_col VALUES (1, 'a'), (2, 'b'),
(3, 'c')");
// Load the table and find the written data file
Catalog catalog = new
HadoopCatalog(spark.sessionState().newHadoopConf(),
warehouse.toAbsolutePath().toString());
Table table = catalog.loadTable(TableIdentifier.of("default",
"test_per_col"));
List<ManifestFile> manifests =
table.currentSnapshot().dataManifests(table.io());
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifests.get(0), table.io())) {
DataFile file = reader.iterator().next();
System.out.println("Data file: " + file.path());
try (ParquetFileReader parquetReader =
ParquetFileReader.open(
new LocalInputFile(Paths.get(file.path().toString())))) {
System.out.println("\nColumn codecs:");
for (BlockMetaData block : parquetReader.getFooter().getBlocks()) {
for (ColumnChunkMetaData col : block.getColumns()) {
System.out.printf(" %-30s %s%n", col.getPath().toDotString(),
col.getCodec());
}
}
}
}
System.out.println("\nExpected:");
System.out.println(" int_col SNAPPY
(per-column override)");
System.out.println(" string_col ZSTD (global
fallback)");
} finally {
spark.sql("DROP TABLE IF EXISTS local.default.test_per_col");
spark.stop();
}
}
}
```
Result
```
Column codecs:
int_col SNAPPY
string_col ZSTD
Expected:
int_col SNAPPY (per-column override)
string_col ZSTD (global fallback)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]