This is an automated email from the ASF dual-hosted git repository.

ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ef5f89f0508 [Iceberg] Fix manifest bounds being padded with trailing 
0x00 bytes (#38580)
ef5f89f0508 is described below

commit ef5f89f050829cb9fc6e1c08ccf51c351a3bbb85
Author: Deji Ibrahim <[email protected]>
AuthorDate: Fri May 22 15:58:57 2026 +0100

    [Iceberg] Fix manifest bounds being padded with trailing 0x00 bytes (#38580)
    
    * [Iceberg] Fix manifest bounds being padded with trailing 0x00 bytes
    
    * update changes.md
    
    * test: encode bound ByteBuffers via Conversions.toByteBuffer
    
    * trigger build
---
 CHANGES.md                                         |  1 +
 .../beam/sdk/io/iceberg/SerializableDataFile.java  | 14 ++++-
 .../sdk/io/iceberg/SerializableDataFileTest.java   | 65 ++++++++++++++++++++++
 3 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/CHANGES.md b/CHANGES.md
index 52475a99d8e..fa4fdb09660 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -98,6 +98,7 @@
 
 * Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when 
multiple requests share the same enrichment key (Python) 
([#38035](https://github.com/apache/beam/issues/38035)).
 * Added `max_batch_duration_secs` passthrough support in Python Enrichment 
BigQuery and CloudSQL handlers so batching duration can be forwarded to 
`BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)).
+* Fixed IcebergIO writing manifest column bounds padded with trailing `0x00` 
bytes, which broke equality predicate pushdown in some query engines (Java) 
([#38580](https://github.com/apache/beam/issues/38580)).
 
 ## Security Fixes
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
index 1f717b82c21..9e75be0a198 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
@@ -205,11 +205,23 @@ abstract class SerializableDataFile {
     }
     Map<Integer, byte[]> output = new HashMap<>(input.size());
     for (Map.Entry<Integer, ByteBuffer> e : input.entrySet()) {
-      output.put(e.getKey(), e.getValue().array());
+      output.put(e.getKey(), toByteArray(e.getValue()));
     }
     return output;
   }
 
+  // Copy only [position, limit). ByteBuffer.array() returns the full backing
+  // array, which is sometimes larger than the buffer's content (e.g. trailing
+  // 0x00 bytes). Leaking those into manifest bounds shifts the lower bound
+  // above the real min and breaks equality predicate pushdown in some query
+  // engines.
+  private static byte[] toByteArray(ByteBuffer buf) {
+    ByteBuffer view = buf.duplicate();
+    byte[] bytes = new byte[view.remaining()];
+    view.get(bytes);
+    return bytes;
+  }
+
   private static @Nullable Map<Integer, ByteBuffer> toByteBufferMap(
       @Nullable Map<Integer, byte[]> input) {
     if (input == null) {
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java
index 983f021fd7c..d4e7793718d 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java
@@ -17,13 +17,27 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
 import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
 import org.junit.Test;
 
 /**
@@ -73,4 +87,55 @@ public class SerializableDataFileTest {
               + "to this test class's FIELDS_SET.");
     }
   }
+
+  /**
+   * Bounds with {@code capacity > limit} must be copied by {@code [position, 
limit)}, not by {@link
+   * ByteBuffer#array()}. Otherwise trailing 0x00 bytes leak into the manifest 
bounds and break
+   * equality predicate pushdown in some query engines.
+   */
+  @Test
+  public void testBoundByteBufferIsCopiedByLimitNotBackingArrayLength() {
+    // Encode bounds the same way iceberg-parquet does in the wild — via
+    // Conversions.toByteBuffer(STRING, value). For UTF-8 strings of 10+
+    // characters the underlying JDK CharsetEncoder over-allocates by ~10%
+    // and flips, producing a ByteBuffer with capacity > limit.
+    int columnId = 3;
+    String lowerValue = "lower_bound_str";
+    String upperValue = "upper_bound_str";
+    byte[] expectedLower = lowerValue.getBytes(StandardCharsets.UTF_8);
+    byte[] expectedUpper = upperValue.getBytes(StandardCharsets.UTF_8);
+
+    ByteBuffer lower = Conversions.toByteBuffer(Types.StringType.get(), 
lowerValue);
+    ByteBuffer upper = Conversions.toByteBuffer(Types.StringType.get(), 
upperValue);
+
+    Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
+    lowerBounds.put(columnId, lower);
+    Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
+    upperBounds.put(columnId, upper);
+
+    Metrics metrics = new Metrics(1L, null, null, null, null, lowerBounds, 
upperBounds);
+
+    DataFile dataFile =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withFormat(FileFormat.PARQUET)
+            .withPath("gs://test-bucket/data/test-file.parquet")
+            .withFileSizeInBytes(1024L)
+            .withMetrics(metrics)
+            .build();
+
+    SerializableDataFile serialized = SerializableDataFile.from(dataFile, "");
+
+    byte[] serializedLower = serialized.getLowerBounds().get(columnId);
+    byte[] serializedUpper = serialized.getUpperBounds().get(columnId);
+    assertEquals(
+        "lower bound length must match content, not backing array",
+        expectedLower.length,
+        serializedLower.length);
+    assertEquals(
+        "upper bound length must match content, not backing array",
+        expectedUpper.length,
+        serializedUpper.length);
+    assertArrayEquals(expectedLower, serializedLower);
+    assertArrayEquals(expectedUpper, serializedUpper);
+  }
 }

Reply via email to