mengna-lin opened a new pull request, #16094:
URL: https://github.com/apache/iceberg/pull/16094

   Closes #16090
   
   Previously all columns in a Parquet file were forced to use the same codec. 
This PR enables parquet per column compression.
   
   Changes
     1. Two new table property prefixes 
(write.parquet.compression-codec.column.<col> and                          
     write.parquet.compression-level.column.<col>) — columns without an 
override fall back to the global codec.
     2. ParquetWriter now holds a CompressionCodecFactory + default codec 
instead of a pre-resolved single         
     BytesInputCompressor, and passes them to the new 
ColumnChunkPageWriteStore.                 
     3. setColumnCompressionConfig in WriteBuilder maps Iceberg column names to 
Parquet dot-paths and calls
     withCompressionCodec/withCompressionLevel on ParquetProperties.Builder, 
wiring the table properties into the  
     Parquet write configuration.
   
   Test with a spark Job
   ```Scala
   /**
    * Runnable Spark job to manually verify per-column Parquet compression.
    *
    * <p>Creates a temporary Iceberg table with:
    * <ul>
    *   <li>Global codec: zstd
    *   <li>Per-column override for {@code int_col}: snappy
    * </ul>
    * Writes a few rows, then reads the Parquet footer and prints each column's 
actual codec.
    */
   public class PerColumnCompressionMain {
   
     public static void main(String[] args) throws Exception {
       Path warehouse = Files.createTempDirectory("iceberg-warehouse");
   
       SparkSession spark =
           SparkSession.builder()
               .master("local[2]")
               .appName("PerColumnCompressionMain")
               .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
               .config("spark.sql.catalog.local", 
"org.apache.iceberg.spark.SparkCatalog")
               .config("spark.sql.catalog.local.type", "hadoop")
               .config("spark.sql.catalog.local.warehouse", 
warehouse.toAbsolutePath().toString())
               .getOrCreate();
   
       try {
         spark.sql(
             "CREATE TABLE local.default.test_per_col ("
                 + "  int_col int,"
                 + "  string_col string"
                 + ") USING iceberg"
                 + " TBLPROPERTIES ("
                 + "  'write.parquet.compression-codec' = 'zstd',"
                 + "  'write.parquet.compression-codec.column.int_col' = 
'snappy'"
                 + ")");
   
         spark.sql(
             "INSERT INTO local.default.test_per_col VALUES (1, 'a'), (2, 'b'), 
(3, 'c')");
   
         // Load the table and find the written data file
         Catalog catalog = new 
HadoopCatalog(spark.sessionState().newHadoopConf(), 
warehouse.toAbsolutePath().toString());
         Table table = catalog.loadTable(TableIdentifier.of("default", 
"test_per_col"));
         List<ManifestFile> manifests = 
table.currentSnapshot().dataManifests(table.io());
   
         try (ManifestReader<DataFile> reader = 
ManifestFiles.read(manifests.get(0), table.io())) {
           DataFile file = reader.iterator().next();
           System.out.println("Data file: " + file.path());
   
           try (ParquetFileReader parquetReader =
               ParquetFileReader.open(
                   new LocalInputFile(Paths.get(file.path().toString())))) {
             System.out.println("\nColumn codecs:");
             for (BlockMetaData block : parquetReader.getFooter().getBlocks()) {
               for (ColumnChunkMetaData col : block.getColumns()) {
                 System.out.printf("  %-30s %s%n", col.getPath().toDotString(), 
col.getCodec());
               }
             }
           }
         }
   
         System.out.println("\nExpected:");
         System.out.println("  int_col                        SNAPPY  
(per-column override)");
         System.out.println("  string_col                     ZSTD    (global 
fallback)");
   
       } finally {
         spark.sql("DROP TABLE IF EXISTS local.default.test_per_col");
         spark.stop();
       }
     }
   }
   ```
   
   Result
   ```
   Column codecs:
     int_col                        SNAPPY
     string_col                     ZSTD
   
   Expected:
     int_col                        SNAPPY  (per-column override)
     string_col                     ZSTD    (global fallback)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to