laskoviymishka commented on code in PR #891:
URL: https://github.com/apache/iceberg-go/pull/891#discussion_r3077872544
##########
table/internal/parquet_files.go:
##########
@@ -694,29 +709,50 @@ func (w wrapPqArrowReader) PrunedSchema(projectedIDs
map[int]struct{}, mapping i
}
func (w wrapPqArrowReader) GetRecords(ctx context.Context, cols []int, tester
any) (array.RecordReader, error) {
- var (
- testRg func(*metadata.RowGroupMetaData, []int) (bool, error)
- ok bool
- )
+ var rowGroupTester *ParquetRowGroupTester
if tester != nil {
- testRg, ok = tester.(func(*metadata.RowGroupMetaData, []int)
(bool, error))
+ var ok bool
+ rowGroupTester, ok = tester.(*ParquetRowGroupTester)
if !ok {
- return nil, fmt.Errorf("%w: invalid tester function",
iceberg.ErrInvalidArgument)
+ return nil, fmt.Errorf("%w: invalid tester type",
iceberg.ErrInvalidArgument)
}
}
var rgList []int
- if testRg != nil {
+ if rowGroupTester != nil && rowGroupTester.StatsFn != nil {
Review Comment:
If `StatsFn` is nil but `BloomPreds` is non-nil, the entire pruning loop is
skipped — including bloom filters. Bloom filters should work independently.
Split the guard:
```go
if rowGroupTester != nil && (rowGroupTester.StatsFn != nil ||
len(rowGroupTester.BloomPreds) > 0) {
```
Then inside the loop, only call `StatsFn` if non-nil (treat nil stats as
"keep"). This way bloom-only pruning still works.
##########
table/evaluators.go:
##########
@@ -1563,3 +1565,212 @@ func (m *strictMetricsEval) canContainNans(fieldID int)
bool {
return exists && cnt > 0
}
+
+// literalToPhysBytes converts an Iceberg literal to the physical byte
+// representation that the Parquet bloom filter writer hashes. The encoding
+// must match Arrow's internal getBytes[T] (little-endian for numerics, raw
+// bytes for byte-array types).
+func literalToPhysBytes(typ iceberg.PrimitiveType, lit iceberg.Literal)
([]byte, bool) {
+ switch typ.(type) {
+ case iceberg.Int32Type:
+ v := lit.(iceberg.TypedLiteral[int32]).Value()
+ b := make([]byte, 4)
+ binary.LittleEndian.PutUint32(b, uint32(v))
+
+ return b, true
+
+ case iceberg.DateType:
+ v := lit.(iceberg.TypedLiteral[iceberg.Date]).Value()
+ b := make([]byte, 4)
+ binary.LittleEndian.PutUint32(b, uint32(v))
+
+ return b, true
+
+ case iceberg.Int64Type:
+ v := lit.(iceberg.TypedLiteral[int64]).Value()
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, uint64(v))
+
+ return b, true
+
+ case iceberg.TimeType:
+ v := lit.(iceberg.TypedLiteral[iceberg.Time]).Value()
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, uint64(v))
+
+ return b, true
+
+ case iceberg.TimestampType, iceberg.TimestampTzType:
+ v := lit.(iceberg.TypedLiteral[iceberg.Timestamp]).Value()
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, uint64(v))
+
+ return b, true
+
+ case iceberg.Float32Type:
+ v := lit.(iceberg.TypedLiteral[float32]).Value()
+ b := make([]byte, 4)
+ binary.LittleEndian.PutUint32(b, math.Float32bits(v))
+
+ return b, true
+
+ case iceberg.Float64Type:
+ v := lit.(iceberg.TypedLiteral[float64]).Value()
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, math.Float64bits(v))
+
+ return b, true
Review Comment:
Parquet and Java normalize `-0.0` → `0.0` and `NaN` → canonical NaN before
bloom filter hashing. Without normalization, searching for `0.0` won't match a
row group where the writer stored `-0.0` (different bit pattern → different
hash). That's a false negative — the bloom filter says "not present" but the
value IS there. False negatives in bloom filters cause **incorrect query
results** (row group skipped, matching rows missing from output).
Add normalization before encoding:
```go
if v == 0 { v = 0 } // normalize -0.0 → 0.0
if math.IsNaN(float64(v)) { v = float32(math.NaN()) } // canonical NaN
```
This matches Java's `BloomFilterReader.hash()` behavior.
##########
table/internal/parquet_files.go:
##########
@@ -726,6 +762,65 @@ func (w wrapPqArrowReader) GetRecords(ctx context.Context,
cols []int, tester an
return w.GetRecordReader(ctx, cols, rgList)
}
+// buildFieldIDToColIdx maps each Iceberg field ID to its 0-based column index
in
+// the Parquet file schema. Used to translate bloom filter predicates (which
carry
+// field IDs) into the column positions required by BloomFilterReader.
+func buildFieldIDToColIdx(meta *metadata.FileMetaData) map[int]int {
+ sc := meta.Schema
+ result := make(map[int]int, sc.NumColumns())
+ for i := 0; i < sc.NumColumns(); i++ {
+ fieldID := int(sc.Column(i).SchemaNode().FieldID())
+ result[fieldID] = i
+ }
+
+ return result
+}
+
+// checkRowGroupBloomFilters checks each bloom predicate against the bloom
filter
+// for its column in row group rg. Returns false (skip) if ANY predicate has
none
+// of its values present in the bloom filter. Returns true (keep) on any error
or
+// missing bloom filter data — bloom filters are an optimisation, never a
+// correctness gate.
+func checkRowGroupBloomFilters(
+ bfReader *metadata.BloomFilterReader,
+ rg int,
+ fieldIDToColIdx map[int]int,
+ preds []RowGroupBloomPred,
+) (bool, error) {
+ rgBFReader, err := bfReader.RowGroup(rg)
+ if err != nil || rgBFReader == nil {
+ return true, nil
+ }
+
+ for _, pred := range preds {
+ colIdx, ok := fieldIDToColIdx[pred.FieldID]
+ if !ok {
+ continue
+ }
+
+ bf, err := rgBFReader.GetColumnBloomFilter(colIdx)
+ if err != nil || bf == nil {
+ continue
+ }
+
+ hasher := bf.Hasher()
+ anyFound := false
+ for _, physBytes := range pred.PhysBytes {
+ if bf.CheckHash(hasher.Sum64(physBytes)) {
+ anyFound = true
+
+ break
+ }
+ }
Review Comment:
Correct. For `In(a, {1, 2, 3})`: if the bloom filter says NONE of {1, 2, 3}
are present, the row group definitely doesn't contain any matching rows → skip.
If ANY value MIGHT be present, we must keep the row group (bloom filters
have false positives).
##########
table/evaluators.go:
##########
@@ -1563,3 +1565,212 @@ func (m *strictMetricsEval) canContainNans(fieldID int)
bool {
return exists && cnt > 0
}
+
+// literalToPhysBytes converts an Iceberg literal to the physical byte
+// representation that the Parquet bloom filter writer hashes. The encoding
+// must match Arrow's internal getBytes[T] (little-endian for numerics, raw
+// bytes for byte-array types).
+func literalToPhysBytes(typ iceberg.PrimitiveType, lit iceberg.Literal)
([]byte, bool) {
+ switch typ.(type) {
+ case iceberg.Int32Type:
+ v := lit.(iceberg.TypedLiteral[int32]).Value()
+ b := make([]byte, 4)
+ binary.LittleEndian.PutUint32(b, uint32(v))
+
+ return b, true
+
+ case iceberg.DateType:
+ v := lit.(iceberg.TypedLiteral[iceberg.Date]).Value()
+ b := make([]byte, 4)
+ binary.LittleEndian.PutUint32(b, uint32(v))
+
+ return b, true
+
+ case iceberg.Int64Type:
+ v := lit.(iceberg.TypedLiteral[int64]).Value()
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, uint64(v))
+
+ return b, true
+
+ case iceberg.TimeType:
+ v := lit.(iceberg.TypedLiteral[iceberg.Time]).Value()
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, uint64(v))
+
+ return b, true
+
+ case iceberg.TimestampType, iceberg.TimestampTzType:
+ v := lit.(iceberg.TypedLiteral[iceberg.Timestamp]).Value()
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, uint64(v))
+
+ return b, true
+
+ case iceberg.Float32Type:
+ v := lit.(iceberg.TypedLiteral[float32]).Value()
+ b := make([]byte, 4)
+ binary.LittleEndian.PutUint32(b, math.Float32bits(v))
+
+ return b, true
+
+ case iceberg.Float64Type:
+ v := lit.(iceberg.TypedLiteral[float64]).Value()
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, math.Float64bits(v))
+
+ return b, true
+
+ case iceberg.StringType:
+ v := lit.(iceberg.TypedLiteral[string]).Value()
+
+ return []byte(v), true
+
+ case iceberg.BinaryType:
+ v := lit.(iceberg.TypedLiteral[[]byte]).Value()
+
+ return v, true
+
+ case iceberg.UUIDType:
+ v := lit.(iceberg.TypedLiteral[uuid.UUID]).Value()
+
+ return v[:], true
+
+ case iceberg.FixedType:
+ v := lit.(iceberg.TypedLiteral[[]byte]).Value()
+
+ return v, true
Review Comment:
`FixedLiteral` is `type FixedLiteral []byte` with `Value() []byte`. It
implements `TypedLiteral[[]byte]` — but only because `BinaryLiteral` and
`FixedLiteral` share the same underlying `[]byte` representation. If the
expression machinery ever passes a `FixedLiteral` directly (not cast to
`TypedLiteral[[]byte]`), this type assertion would panic at runtime. A safe
guard:
```go
case iceberg.FixedType:
switch b := lit.(type) {
case iceberg.TypedLiteral[[]byte]:
return b.Value(), true
case iceberg.FixedLiteral:
return []byte(b), true
}
return nil, false
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]