This is an automated email from the ASF dual-hosted git repository.
ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ef5f89f0508 [Iceberg] Fix manifest bounds being padded with trailing
0x00 bytes (#38580)
ef5f89f0508 is described below
commit ef5f89f050829cb9fc6e1c08ccf51c351a3bbb85
Author: Deji Ibrahim <[email protected]>
AuthorDate: Fri May 22 15:58:57 2026 +0100
[Iceberg] Fix manifest bounds being padded with trailing 0x00 bytes (#38580)
* [Iceberg] Fix manifest bounds being padded with trailing 0x00 bytes
* update changes.md
* test: encode bound ByteBuffers via Conversions.toByteBuffer
* trigger build
---
CHANGES.md | 1 +
.../beam/sdk/io/iceberg/SerializableDataFile.java | 14 ++++-
.../sdk/io/iceberg/SerializableDataFileTest.java | 65 ++++++++++++++++++++++
3 files changed, 79 insertions(+), 1 deletion(-)
diff --git a/CHANGES.md b/CHANGES.md
index 52475a99d8e..fa4fdb09660 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -98,6 +98,7 @@
* Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when
multiple requests share the same enrichment key (Python)
([#38035](https://github.com/apache/beam/issues/38035)).
* Added `max_batch_duration_secs` passthrough support in Python Enrichment
BigQuery and CloudSQL handlers so batching duration can be forwarded to
`BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)).
+* Fixed IcebergIO writing manifest column bounds padded with trailing `0x00`
bytes, which broke equality predicate pushdown in some query engines (Java)
([#38580](https://github.com/apache/beam/issues/38580)).
## Security Fixes
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
index 1f717b82c21..9e75be0a198 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
@@ -205,11 +205,23 @@ abstract class SerializableDataFile {
}
Map<Integer, byte[]> output = new HashMap<>(input.size());
for (Map.Entry<Integer, ByteBuffer> e : input.entrySet()) {
- output.put(e.getKey(), e.getValue().array());
+ output.put(e.getKey(), toByteArray(e.getValue()));
}
return output;
}
+ // Copy only [position, limit). ByteBuffer.array() returns the full backing
+ // array, which is sometimes larger than the buffer's content (e.g. trailing
+ // 0x00 bytes). Leaking those into manifest bounds shifts the lower bound
+ // above the real min and breaks equality predicate pushdown in some query
+ // engines.
+ private static byte[] toByteArray(ByteBuffer buf) {
+ ByteBuffer view = buf.duplicate();
+ byte[] bytes = new byte[view.remaining()];
+ view.get(bytes);
+ return bytes;
+ }
+
private static @Nullable Map<Integer, ByteBuffer> toByteBufferMap(
@Nullable Map<Integer, byte[]> input) {
if (input == null) {
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java
index 983f021fd7c..d4e7793718d 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java
@@ -17,13 +17,27 @@
*/
package org.apache.beam.sdk.io.iceberg;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
import org.junit.Test;
/**
@@ -73,4 +87,55 @@ public class SerializableDataFileTest {
+ "to this test class's FIELDS_SET.");
}
}
+
+ /**
+ * Bounds with {@code capacity > limit} must be copied by {@code [position,
limit)}, not by {@link
+ * ByteBuffer#array()}. Otherwise trailing 0x00 bytes leak into the manifest
bounds and break
+ * equality predicate pushdown in some query engines.
+ */
+ @Test
+ public void testBoundByteBufferIsCopiedByLimitNotBackingArrayLength() {
+ // Encode bounds the same way iceberg-parquet does in the wild — via
+ // Conversions.toByteBuffer(STRING, value). For UTF-8 strings of 10+
+ // characters the underlying JDK CharsetEncoder over-allocates by ~10%
+ // and flips, producing a ByteBuffer with capacity > limit.
+ int columnId = 3;
+ String lowerValue = "lower_bound_str";
+ String upperValue = "upper_bound_str";
+ byte[] expectedLower = lowerValue.getBytes(StandardCharsets.UTF_8);
+ byte[] expectedUpper = upperValue.getBytes(StandardCharsets.UTF_8);
+
+ ByteBuffer lower = Conversions.toByteBuffer(Types.StringType.get(),
lowerValue);
+ ByteBuffer upper = Conversions.toByteBuffer(Types.StringType.get(),
upperValue);
+
+ Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
+ lowerBounds.put(columnId, lower);
+ Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
+ upperBounds.put(columnId, upper);
+
+ Metrics metrics = new Metrics(1L, null, null, null, null, lowerBounds,
upperBounds);
+
+ DataFile dataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withFormat(FileFormat.PARQUET)
+ .withPath("gs://test-bucket/data/test-file.parquet")
+ .withFileSizeInBytes(1024L)
+ .withMetrics(metrics)
+ .build();
+
+ SerializableDataFile serialized = SerializableDataFile.from(dataFile, "");
+
+ byte[] serializedLower = serialized.getLowerBounds().get(columnId);
+ byte[] serializedUpper = serialized.getUpperBounds().get(columnId);
+ assertEquals(
+ "lower bound length must match content, not backing array",
+ expectedLower.length,
+ serializedLower.length);
+ assertEquals(
+ "upper bound length must match content, not backing array",
+ expectedUpper.length,
+ serializedUpper.length);
+ assertArrayEquals(expectedLower, serializedLower);
+ assertArrayEquals(expectedUpper, serializedUpper);
+ }
}