This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d00780f5 Parquet: Add option to set page row count limit (#5345)
3d00780f5 is described below

commit 3d00780f5f508ac7a0f475fd0e1ddfd736000c5b
Author: Bryan Keller <[email protected]>
AuthorDate: Thu Jul 28 10:59:05 2022 -0700

    Parquet: Add option to set page row count limit (#5345)
---
 .../java/org/apache/iceberg/TableProperties.java   |  4 ++++
 docs/configuration.md                              |  1 +
 .../java/org/apache/iceberg/parquet/Parquet.java   | 25 ++++++++++++++++++++++
 3 files changed, 30 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java 
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index a8ca36379..06960939c 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -131,6 +131,10 @@ public class TableProperties {
       "write.delete.parquet.page-size-bytes";
   public static final int PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024; // 1 
MB
 
+  public static final String PARQUET_PAGE_ROW_LIMIT = 
"write.parquet.page-row-limit";
+  public static final String DELETE_PARQUET_PAGE_ROW_LIMIT = 
"write.delete.parquet.page-row-limit";
+  public static final int PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20_000;
+
   public static final String PARQUET_DICT_SIZE_BYTES = 
"write.parquet.dict-size-bytes";
   public static final String DELETE_PARQUET_DICT_SIZE_BYTES =
       "write.delete.parquet.dict-size-bytes";
diff --git a/docs/configuration.md b/docs/configuration.md
index eab539ac5..a82ae0c81 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -52,6 +52,7 @@ Iceberg tables support table properties to configure table 
behavior, like the de
 | write.delete.format.default        | data file format   | Default delete 
file format for the table; parquet, avro, or orc |
 | write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group 
size                             |
 | write.parquet.page-size-bytes      | 1048576 (1 MB)     | Parquet page size  
                                |
+| write.parquet.page-row-limit       | 20000              | Parquet page row 
limit                             |
 | write.parquet.dict-size-bytes      | 2097152 (2 MB)     | Parquet dictionary 
page size                       |
 | write.parquet.compression-codec    | gzip               | Parquet 
compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
 | write.parquet.compression-level    | null               | Parquet 
compression level                          |
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index ac8856b92..856c8089b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.parquet;
 import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
 import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
 import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT;
 import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
 import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
 import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
@@ -34,6 +35,8 @@ import static 
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
 import static 
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
 import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
 import static 
org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT;
+import static 
org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT;
 import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES;
 import static 
org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
 import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
@@ -240,6 +243,7 @@ public class Parquet {
 
       int rowGroupSize = context.rowGroupSize();
       int pageSize = context.pageSize();
+      int pageRowLimit = context.pageRowLimit();
       int dictionaryPageSize = context.dictionaryPageSize();
       String compressionLevel = context.compressionLevel();
       CompressionCodecName codec = context.codec();
@@ -281,6 +285,7 @@ public class Parquet {
             ParquetProperties.builder()
                 .withWriterVersion(writerVersion)
                 .withPageSize(pageSize)
+                .withPageRowCountLimit(pageRowLimit)
                 .withDictionaryPageSize(dictionaryPageSize)
                 .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
                 .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
@@ -317,6 +322,7 @@ public class Parquet {
                 .withWriteMode(writeMode)
                 .withRowGroupSize(rowGroupSize)
                 .withPageSize(pageSize)
+                .withPageRowCountLimit(pageRowLimit)
                 .withDictionaryPageSize(dictionaryPageSize);
 
         for (Map.Entry<String, String> entry : 
columnBloomFilterEnabled.entrySet()) {
@@ -332,6 +338,7 @@ public class Parquet {
     private static class Context {
       private final int rowGroupSize;
       private final int pageSize;
+      private final int pageRowLimit;
       private final int dictionaryPageSize;
       private final CompressionCodecName codec;
       private final String compressionLevel;
@@ -343,6 +350,7 @@ public class Parquet {
       private Context(
           int rowGroupSize,
           int pageSize,
+          int pageRowLimit,
           int dictionaryPageSize,
           CompressionCodecName codec,
           String compressionLevel,
@@ -352,6 +360,7 @@ public class Parquet {
           Map<String, String> columnBloomFilterEnabled) {
         this.rowGroupSize = rowGroupSize;
         this.pageSize = pageSize;
+        this.pageRowLimit = pageRowLimit;
         this.dictionaryPageSize = dictionaryPageSize;
         this.codec = codec;
         this.compressionLevel = compressionLevel;
@@ -372,6 +381,11 @@ public class Parquet {
                 config, PARQUET_PAGE_SIZE_BYTES, 
PARQUET_PAGE_SIZE_BYTES_DEFAULT);
         Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
 
+        int pageRowLimit =
+            PropertyUtil.propertyAsInt(
+                config, PARQUET_PAGE_ROW_LIMIT, 
PARQUET_PAGE_ROW_LIMIT_DEFAULT);
+        Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit 
must be > 0");
+
         int dictionaryPageSize =
             PropertyUtil.propertyAsInt(
                 config, PARQUET_DICT_SIZE_BYTES, 
PARQUET_DICT_SIZE_BYTES_DEFAULT);
@@ -414,6 +428,7 @@ public class Parquet {
         return new Context(
             rowGroupSize,
             pageSize,
+            pageRowLimit,
             dictionaryPageSize,
             codec,
             compressionLevel,
@@ -437,6 +452,11 @@ public class Parquet {
                 config, DELETE_PARQUET_PAGE_SIZE_BYTES, 
dataContext.pageSize());
         Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
 
+        int pageRowLimit =
+            PropertyUtil.propertyAsInt(
+                config, DELETE_PARQUET_PAGE_ROW_LIMIT, 
dataContext.pageRowLimit());
+        Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit 
must be > 0");
+
         int dictionaryPageSize =
             PropertyUtil.propertyAsInt(
                 config, DELETE_PARQUET_DICT_SIZE_BYTES, 
dataContext.dictionaryPageSize());
@@ -479,6 +499,7 @@ public class Parquet {
         return new Context(
             rowGroupSize,
             pageSize,
+            pageRowLimit,
             dictionaryPageSize,
             codec,
             compressionLevel,
@@ -504,6 +525,10 @@ public class Parquet {
         return pageSize;
       }
 
+      int pageRowLimit() {
+        return pageRowLimit;
+      }
+
       int dictionaryPageSize() {
         return dictionaryPageSize;
       }

Reply via email to