This is an automated email from the ASF dual-hosted git repository.
cvandermerwe pushed a commit to branch release-2.72
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.72 by this push:
new 2015c747a4c Add more ParquetIo write options (#37740) (#37790)
2015c747a4c is described below
commit 2015c747a4c36849e8d428d5a6c4b676481f781e
Author: claudevdm <[email protected]>
AuthorDate: Fri Mar 6 13:06:33 2026 -0500
Add more ParquetIo write options (#37740) (#37790)
* add more parquet options
* comments
* more tests and use default
---
.../org/apache/beam/sdk/io/parquet/ParquetIO.java | 60 +++++++++++++-
.../apache/beam/sdk/io/parquet/ParquetIOTest.java | 92 ++++++++++++++++++++++
2 files changed, 149 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index 24c18f38281..e6e4e27d74f 100644
---
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -69,6 +69,7 @@ import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -1005,8 +1006,11 @@ public class ParquetIO {
return new AutoValue_ParquetIO_Sink.Builder()
.setJsonSchema(schema.toString())
.setCompressionCodec(CompressionCodecName.SNAPPY)
- // This resembles the default value for ParquetWriter.rowGroupSize.
.setRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
+ .setPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
+ .setEnableDictionary(ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED)
+ .setEnableBloomFilter(ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED)
+
.setMinRowCountForPageSizeCheck(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
.build();
}
@@ -1022,6 +1026,14 @@ public class ParquetIO {
abstract int getRowGroupSize();
+ abstract int getPageSize();
+
+ abstract boolean getEnableDictionary();
+
+ abstract boolean getEnableBloomFilter();
+
+ abstract int getMinRowCountForPageSizeCheck();
+
abstract @Nullable Class<? extends GenericData> getAvroDataModelClass();
abstract Builder toBuilder();
@@ -1036,6 +1048,14 @@ public class ParquetIO {
abstract Builder setRowGroupSize(int rowGroupSize);
+ abstract Builder setPageSize(int pageSize);
+
+ abstract Builder setEnableDictionary(boolean enableDictionary);
+
+ abstract Builder setEnableBloomFilter(boolean enableBloomFilter);
+
+ abstract Builder setMinRowCountForPageSizeCheck(int
minRowCountForPageSizeCheck);
+
abstract Builder setAvroDataModelClass(Class<? extends GenericData>
modelClass);
abstract Sink build();
@@ -1064,6 +1084,34 @@ public class ParquetIO {
return toBuilder().setRowGroupSize(rowGroupSize).build();
}
+ /** Specify the page size for the Parquet writer. Defaults to {@code 1
MB}. */
+ public Sink withPageSize(int pageSize) {
+ checkArgument(pageSize > 0, "pageSize must be positive");
+ return toBuilder().setPageSize(pageSize).build();
+ }
+
+ /** Enable or disable dictionary encoding. Enabled by default. */
+ public Sink withDictionaryEncoding(boolean enableDictionary) {
+ return toBuilder().setEnableDictionary(enableDictionary).build();
+ }
+
+ /** Enable or disable bloom filters. Disabled by default. */
+ public Sink withBloomFilterEnabled(boolean enableBloomFilter) {
+ return toBuilder().setEnableBloomFilter(enableBloomFilter).build();
+ }
+
+ /**
+ * Specify the minimum number of rows to write before a page size check is
performed. The writer
+ * buffers at least this many rows before checking whether the page size
threshold has been
+ * reached. With large rows, the default ({@code 100}) can cause excessive
memory use; set a
+ * lower value (e.g. {@code 1}) to flush pages more frequently.
+ */
+ public Sink withMinRowCountForPageSizeCheck(int
minRowCountForPageSizeCheck) {
+ checkArgument(
+ minRowCountForPageSizeCheck > 0, "minRowCountForPageSizeCheck must
be positive");
+ return
toBuilder().setMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck).build();
+ }
+
/**
* Define the Avro data model; see {@link
AvroParquetWriter.Builder#withDataModel(GenericData)}.
*/
@@ -1079,6 +1127,7 @@ public class ParquetIO {
Schema schema = new Schema.Parser().parse(getJsonSchema());
Class<? extends GenericData> modelClass = getAvroDataModelClass();
+ Configuration conf =
SerializableConfiguration.newConfiguration(getConfiguration());
BeamParquetOutputFile beamParquetOutputFile =
new BeamParquetOutputFile(Channels.newOutputStream(channel));
@@ -1088,8 +1137,13 @@ public class ParquetIO {
.withSchema(schema)
.withCompressionCodec(getCompressionCodec())
.withWriteMode(OVERWRITE)
-
.withConf(SerializableConfiguration.newConfiguration(getConfiguration()))
- .withRowGroupSize(getRowGroupSize());
+ .withConf(conf)
+ .withRowGroupSize(getRowGroupSize())
+ .withPageSize(getPageSize())
+ .withDictionaryEncoding(getEnableDictionary())
+ .withBloomFilterEnabled(getEnableBloomFilter())
+
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck());
+
if (modelClass != null) {
try {
builder.withDataModel(buildModelObject(modelClass));
diff --git
a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
index 7ee3ec5050f..97f96452760 100644
---
a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
+++
b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
@@ -21,12 +21,14 @@ import static java.util.stream.Collectors.toList;
import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
@@ -57,8 +59,11 @@ import org.apache.beam.sdk.values.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.junit.Rule;
import org.junit.Test;
@@ -518,6 +523,93 @@ public class ParquetIOTest implements Serializable {
readPipeline.run().waitUntilFinish();
}
+ @Test
+ public void testWriteWithDefaultWriterProperties() throws Exception {
+ List<GenericRecord> records = generateGenericRecords(1000);
+
+ mainPipeline
+ .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+ .apply(
+ FileIO.<GenericRecord>write()
+ .via(ParquetIO.sink(SCHEMA))
+ .to(temporaryFolder.getRoot().getAbsolutePath()));
+ mainPipeline.run().waitUntilFinish();
+
+ File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) ->
!name.startsWith("."));
+ assertTrue("Expected at least one output file", outputFiles != null &&
outputFiles.length > 0);
+
+ org.apache.hadoop.fs.Path hadoopPath = new
org.apache.hadoop.fs.Path(outputFiles[0].toURI());
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new
Configuration()))) {
+ ParquetMetadata footer = reader.getFooter();
+
+ // Verify bloom filters are absent by default.
+ boolean hasBloomFilter =
+ footer.getBlocks().stream()
+ .flatMap(block -> block.getColumns().stream())
+ .anyMatch(col -> col.getBloomFilterOffset() >= 0);
+ assertFalse("Expected no bloom filters by default", hasBloomFilter);
+
+ // Verify dictionary encoding is enabled by default.
+ boolean hasDictionary =
+ footer.getBlocks().stream()
+ .flatMap(block -> block.getColumns().stream())
+ .anyMatch(col -> col.getDictionaryPageOffset() > 0);
+ assertTrue("Expected dictionary pages to be present by default",
hasDictionary);
+ }
+ }
+
+ @Test
+ public void testWriteWithWriterProperties() throws Exception {
+ List<GenericRecord> records = generateGenericRecords(1000);
+
+ mainPipeline
+ .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+ .apply(
+ FileIO.<GenericRecord>write()
+ .via(
+ ParquetIO.sink(SCHEMA)
+ .withPageSize(256 * 1024)
+ .withDictionaryEncoding(false)
+ .withBloomFilterEnabled(true)
+ .withMinRowCountForPageSizeCheck(5))
+ .to(temporaryFolder.getRoot().getAbsolutePath()));
+ mainPipeline.run().waitUntilFinish();
+
+ // Read back the file metadata and verify the settings took effect.
+ File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) ->
!name.startsWith("."));
+ assertTrue("Expected at least one output file", outputFiles != null &&
outputFiles.length > 0);
+
+ org.apache.hadoop.fs.Path hadoopPath = new
org.apache.hadoop.fs.Path(outputFiles[0].toURI());
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new
Configuration()))) {
+ ParquetMetadata footer = reader.getFooter();
+
+ // Verify bloom filters were written: at least one column should have a
bloom filter.
+ boolean hasBloomFilter =
+ footer.getBlocks().stream()
+ .flatMap(block -> block.getColumns().stream())
+ .anyMatch(col -> col.getBloomFilterOffset() >= 0);
+ assertTrue("Expected bloom filters to be present", hasBloomFilter);
+
+ // Verify dictionary encoding was disabled: no columns should have
dictionary pages.
+ // getDictionaryPageOffset() returns 0 when no dictionary page is
present.
+ boolean hasDictionary =
+ footer.getBlocks().stream()
+ .flatMap(block -> block.getColumns().stream())
+ .anyMatch(col -> col.getDictionaryPageOffset() > 0);
+ assertFalse(
+ "Expected no dictionary pages when dictionary encoding is disabled",
hasDictionary);
+ }
+
+ // Verify the data still round-trips correctly.
+ PCollection<GenericRecord> readBack =
+ readPipeline.apply(
+
ParquetIO.read(SCHEMA).from(temporaryFolder.getRoot().getAbsolutePath() +
"/*"));
+ PAssert.that(readBack).containsInAnyOrder(records);
+ readPipeline.run().waitUntilFinish();
+ }
+
/** Returns list of JSON representation of GenericRecords. */
private static List<String> convertRecordsToJson(List<GenericRecord>
records) {
return
records.stream().map(ParseGenericRecordAsJsonFn.create()::apply).collect(toList());