This is an automated email from the ASF dual-hosted git repository.

cvandermerwe pushed a commit to branch release-2.72
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.72 by this push:
     new 2015c747a4c Add more ParquetIo write options (#37740) (#37790)
2015c747a4c is described below

commit 2015c747a4c36849e8d428d5a6c4b676481f781e
Author: claudevdm <[email protected]>
AuthorDate: Fri Mar 6 13:06:33 2026 -0500

    Add more ParquetIo write options (#37740) (#37790)
    
    * add more parquet options
    
    * comments
    
    * more tests and use default
---
 .../org/apache/beam/sdk/io/parquet/ParquetIO.java  | 60 +++++++++++++-
 .../apache/beam/sdk/io/parquet/ParquetIOTest.java  | 92 ++++++++++++++++++++++
 2 files changed, 149 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index 24c18f38281..e6e4e27d74f 100644
--- 
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++ 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -69,6 +69,7 @@ import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -1005,8 +1006,11 @@ public class ParquetIO {
     return new AutoValue_ParquetIO_Sink.Builder()
         .setJsonSchema(schema.toString())
         .setCompressionCodec(CompressionCodecName.SNAPPY)
-        // This resembles the default value for ParquetWriter.rowGroupSize.
         .setRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
+        .setPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
+        .setEnableDictionary(ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED)
+        .setEnableBloomFilter(ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED)
+        
.setMinRowCountForPageSizeCheck(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
         .build();
   }
 
@@ -1022,6 +1026,14 @@ public class ParquetIO {
 
     abstract int getRowGroupSize();
 
+    abstract int getPageSize();
+
+    abstract boolean getEnableDictionary();
+
+    abstract boolean getEnableBloomFilter();
+
+    abstract int getMinRowCountForPageSizeCheck();
+
     abstract @Nullable Class<? extends GenericData> getAvroDataModelClass();
 
     abstract Builder toBuilder();
@@ -1036,6 +1048,14 @@ public class ParquetIO {
 
       abstract Builder setRowGroupSize(int rowGroupSize);
 
+      abstract Builder setPageSize(int pageSize);
+
+      abstract Builder setEnableDictionary(boolean enableDictionary);
+
+      abstract Builder setEnableBloomFilter(boolean enableBloomFilter);
+
+      abstract Builder setMinRowCountForPageSizeCheck(int 
minRowCountForPageSizeCheck);
+
       abstract Builder setAvroDataModelClass(Class<? extends GenericData> 
modelClass);
 
       abstract Sink build();
@@ -1064,6 +1084,34 @@ public class ParquetIO {
       return toBuilder().setRowGroupSize(rowGroupSize).build();
     }
 
+    /** Specify the page size for the Parquet writer. Defaults to {@code 1 
MB}. */
+    public Sink withPageSize(int pageSize) {
+      checkArgument(pageSize > 0, "pageSize must be positive");
+      return toBuilder().setPageSize(pageSize).build();
+    }
+
+    /** Enable or disable dictionary encoding. Enabled by default. */
+    public Sink withDictionaryEncoding(boolean enableDictionary) {
+      return toBuilder().setEnableDictionary(enableDictionary).build();
+    }
+
+    /** Enable or disable bloom filters. Disabled by default. */
+    public Sink withBloomFilterEnabled(boolean enableBloomFilter) {
+      return toBuilder().setEnableBloomFilter(enableBloomFilter).build();
+    }
+
+    /**
+     * Specify the minimum number of rows to write before a page size check is 
performed. The writer
+     * buffers at least this many rows before checking whether the page size 
threshold has been
+     * reached. With large rows, the default ({@code 100}) can cause excessive 
memory use; set a
+     * lower value (e.g. {@code 1}) to flush pages more frequently.
+     */
+    public Sink withMinRowCountForPageSizeCheck(int 
minRowCountForPageSizeCheck) {
+      checkArgument(
+          minRowCountForPageSizeCheck > 0, "minRowCountForPageSizeCheck must 
be positive");
+      return 
toBuilder().setMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck).build();
+    }
+
     /**
      * Define the Avro data model; see {@link 
AvroParquetWriter.Builder#withDataModel(GenericData)}.
      */
@@ -1079,6 +1127,7 @@ public class ParquetIO {
 
       Schema schema = new Schema.Parser().parse(getJsonSchema());
       Class<? extends GenericData> modelClass = getAvroDataModelClass();
+      Configuration conf = 
SerializableConfiguration.newConfiguration(getConfiguration());
 
       BeamParquetOutputFile beamParquetOutputFile =
           new BeamParquetOutputFile(Channels.newOutputStream(channel));
@@ -1088,8 +1137,13 @@ public class ParquetIO {
               .withSchema(schema)
               .withCompressionCodec(getCompressionCodec())
               .withWriteMode(OVERWRITE)
-              
.withConf(SerializableConfiguration.newConfiguration(getConfiguration()))
-              .withRowGroupSize(getRowGroupSize());
+              .withConf(conf)
+              .withRowGroupSize(getRowGroupSize())
+              .withPageSize(getPageSize())
+              .withDictionaryEncoding(getEnableDictionary())
+              .withBloomFilterEnabled(getEnableBloomFilter())
+              
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck());
+
       if (modelClass != null) {
         try {
           builder.withDataModel(buildModelObject(modelClass));
diff --git 
a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
 
b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
index 7ee3ec5050f..97f96452760 100644
--- 
a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
+++ 
b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
@@ -21,12 +21,14 @@ import static java.util.stream.Collectors.toList;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
@@ -57,8 +59,11 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.filter2.predicate.FilterApi;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.api.Binary;
 import org.junit.Rule;
 import org.junit.Test;
@@ -518,6 +523,93 @@ public class ParquetIOTest implements Serializable {
     readPipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testWriteWithDefaultWriterProperties() throws Exception {
+    List<GenericRecord> records = generateGenericRecords(1000);
+
+    mainPipeline
+        .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+        .apply(
+            FileIO.<GenericRecord>write()
+                .via(ParquetIO.sink(SCHEMA))
+                .to(temporaryFolder.getRoot().getAbsolutePath()));
+    mainPipeline.run().waitUntilFinish();
+
+    File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> 
!name.startsWith("."));
+    assertTrue("Expected at least one output file", outputFiles != null && 
outputFiles.length > 0);
+
+    org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(outputFiles[0].toURI());
+    try (ParquetFileReader reader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new 
Configuration()))) {
+      ParquetMetadata footer = reader.getFooter();
+
+      // Verify bloom filters are absent by default.
+      boolean hasBloomFilter =
+          footer.getBlocks().stream()
+              .flatMap(block -> block.getColumns().stream())
+              .anyMatch(col -> col.getBloomFilterOffset() >= 0);
+      assertFalse("Expected no bloom filters by default", hasBloomFilter);
+
+      // Verify dictionary encoding is enabled by default.
+      boolean hasDictionary =
+          footer.getBlocks().stream()
+              .flatMap(block -> block.getColumns().stream())
+              .anyMatch(col -> col.getDictionaryPageOffset() > 0);
+      assertTrue("Expected dictionary pages to be present by default", 
hasDictionary);
+    }
+  }
+
+  @Test
+  public void testWriteWithWriterProperties() throws Exception {
+    List<GenericRecord> records = generateGenericRecords(1000);
+
+    mainPipeline
+        .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+        .apply(
+            FileIO.<GenericRecord>write()
+                .via(
+                    ParquetIO.sink(SCHEMA)
+                        .withPageSize(256 * 1024)
+                        .withDictionaryEncoding(false)
+                        .withBloomFilterEnabled(true)
+                        .withMinRowCountForPageSizeCheck(5))
+                .to(temporaryFolder.getRoot().getAbsolutePath()));
+    mainPipeline.run().waitUntilFinish();
+
+    // Read back the file metadata and verify the settings took effect.
+    File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> 
!name.startsWith("."));
+    assertTrue("Expected at least one output file", outputFiles != null && 
outputFiles.length > 0);
+
+    org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(outputFiles[0].toURI());
+    try (ParquetFileReader reader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new 
Configuration()))) {
+      ParquetMetadata footer = reader.getFooter();
+
+      // Verify bloom filters were written: at least one column should have a 
bloom filter.
+      boolean hasBloomFilter =
+          footer.getBlocks().stream()
+              .flatMap(block -> block.getColumns().stream())
+              .anyMatch(col -> col.getBloomFilterOffset() >= 0);
+      assertTrue("Expected bloom filters to be present", hasBloomFilter);
+
+      // Verify dictionary encoding was disabled: no columns should have 
dictionary pages.
+      // getDictionaryPageOffset() returns 0 when no dictionary page is 
present.
+      boolean hasDictionary =
+          footer.getBlocks().stream()
+              .flatMap(block -> block.getColumns().stream())
+              .anyMatch(col -> col.getDictionaryPageOffset() > 0);
+      assertFalse(
+          "Expected no dictionary pages when dictionary encoding is disabled", 
hasDictionary);
+    }
+
+    // Verify the data still round-trips correctly.
+    PCollection<GenericRecord> readBack =
+        readPipeline.apply(
+            
ParquetIO.read(SCHEMA).from(temporaryFolder.getRoot().getAbsolutePath() + 
"/*"));
+    PAssert.that(readBack).containsInAnyOrder(records);
+    readPipeline.run().waitUntilFinish();
+  }
+
   /** Returns list of JSON representation of GenericRecords. */
   private static List<String> convertRecordsToJson(List<GenericRecord> 
records) {
     return 
records.stream().map(ParseGenericRecordAsJsonFn.create()::apply).collect(toList());

Reply via email to