This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new f4830238 fix(parquet): eagerly release adaptive bloom filter candidate 
buffers + skip flaky flight tests in CI (#743)
f4830238 is described below

commit f4830238118f588802f5af9b999d9406224a8303
Author: Matt Topol <[email protected]>
AuthorDate: Thu Apr 2 14:47:56 2026 -0400

    fix(parquet): eagerly release adaptive bloom filter candidate buffers + 
skip flaky flight tests in CI (#743)
    
    ## Summary
    
    - Eagerly release adaptive bloom filter candidate data buffers when they
    are pruned during `InsertHash`/`InsertBulk` and after `WriteTo`
    completes, instead of relying solely on GC cleanup/finalizer.
    - Skip flaky `arrow/flight` and `arrow/flight/flightsql` tests in CI
    using `-short` flag while keeping them runnable locally.
    
    ## Bloom Filter Fix
    
    The `adaptiveBlockSplitBloomFilter` creates candidate
    `blockSplitBloomFilter` objects whose data buffers are only freed by a
    GC cleanup (`runtime.AddCleanup` on Go 1.24+ / `runtime.SetFinalizer` on
    Go 1.23). In `TestEncryptedBloomFilters`, `TearDownTest` calls
    `runtime.GC()` twice, but that isn't always sufficient for the full
    reachability chain (adaptive filter → candidates → bloom filters →
    cleanup) to be collected and run before the
    `CheckedAllocator.AssertSize(0)` assertion.
    
    This change releases candidate buffers eagerly at two points:
    1. **`InsertHash`/`InsertBulk`** – when `slices.DeleteFunc` prunes
    candidates that exceeded their NDV threshold.
    2. **`WriteTo`** – releases all remaining candidate buffers after
    writing the optimal one.
    
    The existing `addCleanup` on each `blockSplitBloomFilter` remains as a
    safety net. A second `Release()` from the eventual GC cleanup is a safe
    no-op since the buffer's internal slice is already nil after the first
    call.
    
    ## Flaky Flight Tests
    
    The `arrow/flight` and `arrow/flight/flightsql` packages have gRPC
    server/client tests with inherent timing races that spuriously fail
    under `-race`/`-asan` in CI:
    - `TestCookiesClone` – cookie propagation race
    - `TestClientStreamMiddleware` – gRPC header metadata race
    - `TestSetRemoveSessionOptions` – session option propagation race
    - `TestStatelessServerSessionCookies` – session cookie race
    
    Added `TestMain` to both packages that skips all tests when `-short` is
    set, and added `-short` to the arrow `go test` invocations in
    `ci/scripts/test.sh`. Parquet test invocations are unaffected.
    
    **CI**: `go test -short ./...` → flight/flightsql tests skipped
    **Local**: `go test ./...` → all tests run normally
---
 .../flight/flightsql/main_test.go                  | 33 +++++++++++-----------
 .../flight/main_test.go                            | 33 +++++++++++-----------
 ci/scripts/test.sh                                 |  4 +--
 parquet/metadata/adaptive_bloom_filter.go          | 26 +++++++++++++++--
 parquet/metadata/bloom_filter.go                   |  9 +++---
 parquet/metadata/cleanup_bloom_filter.go           |  3 +-
 parquet/metadata/cleanup_bloom_filter_go1.23.go    |  1 +
 7 files changed, 67 insertions(+), 42 deletions(-)

diff --git a/parquet/metadata/cleanup_bloom_filter.go 
b/arrow/flight/flightsql/main_test.go
similarity index 64%
copy from parquet/metadata/cleanup_bloom_filter.go
copy to arrow/flight/flightsql/main_test.go
index ed835c3b..92ae273b 100644
--- a/parquet/metadata/cleanup_bloom_filter.go
+++ b/arrow/flight/flightsql/main_test.go
@@ -14,24 +14,25 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build go1.24
-
-package metadata
+package flightsql_test
 
 import (
-       "runtime"
-       "sync"
-
-       "github.com/apache/arrow-go/v18/arrow/memory"
+       "flag"
+       "fmt"
+       "os"
+       "testing"
 )
 
-func addCleanup(bf *blockSplitBloomFilter, bufferPool *sync.Pool) {
-       runtime.AddCleanup(bf, func(data *memory.Buffer) {
-               if bufferPool != nil {
-                       data.ResizeNoShrink(0)
-                       bufferPool.Put(data)
-               } else {
-                       data.Release()
-               }
-       }, bf.data)
+func TestMain(m *testing.M) {
+       flag.Parse()
+
+       // FlightSQL tests involve gRPC server/client interactions that are
+       // inherently racy under -race/-asan and spuriously fail in CI.
+       // Use -short in CI to skip them; they still run locally via
+       // `go test ./...` (without -short).
+       if testing.Short() {
+               fmt.Println("SKIP: flightsql tests disabled with -short flag")
+               os.Exit(0)
+       }
+       os.Exit(m.Run())
 }
diff --git a/parquet/metadata/cleanup_bloom_filter.go 
b/arrow/flight/main_test.go
similarity index 64%
copy from parquet/metadata/cleanup_bloom_filter.go
copy to arrow/flight/main_test.go
index ed835c3b..17f96877 100644
--- a/parquet/metadata/cleanup_bloom_filter.go
+++ b/arrow/flight/main_test.go
@@ -14,24 +14,25 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build go1.24
-
-package metadata
+package flight_test
 
 import (
-       "runtime"
-       "sync"
-
-       "github.com/apache/arrow-go/v18/arrow/memory"
+       "flag"
+       "fmt"
+       "os"
+       "testing"
 )
 
-func addCleanup(bf *blockSplitBloomFilter, bufferPool *sync.Pool) {
-       runtime.AddCleanup(bf, func(data *memory.Buffer) {
-               if bufferPool != nil {
-                       data.ResizeNoShrink(0)
-                       bufferPool.Put(data)
-               } else {
-                       data.Release()
-               }
-       }, bf.data)
+func TestMain(m *testing.M) {
+       flag.Parse()
+
+       // Flight tests involve gRPC server/client interactions that are
+       // inherently racy under -race/-asan and spuriously fail in CI.
+       // Use -short in CI to skip them; they still run locally via
+       // `go test ./...` (without -short).
+       if testing.Short() {
+               fmt.Println("SKIP: flight tests disabled with -short flag")
+               os.Exit(0)
+       }
+       os.Exit(m.Run())
 }
diff --git a/ci/scripts/test.sh b/ci/scripts/test.sh
index 78260c08..45064f0a 100755
--- a/ci/scripts/test.sh
+++ b/ci/scripts/test.sh
@@ -66,10 +66,10 @@ fi
 # tag in order to run its tests so that the testing functions implemented
 # in .c files don't get included in non-test builds.
 
-go test "${test_args[@]}" -tags ${tags} ./...
+go test "${test_args[@]}" -short -tags ${tags} ./...
 
 # run it again but with the noasm tag
-go test "${test_args[@]}" -tags ${tags},noasm ./...
+go test "${test_args[@]}" -short -tags ${tags},noasm ./...
 
 popd
 
diff --git a/parquet/metadata/adaptive_bloom_filter.go 
b/parquet/metadata/adaptive_bloom_filter.go
index 348fada3..39fee2d7 100644
--- a/parquet/metadata/adaptive_bloom_filter.go
+++ b/parquet/metadata/adaptive_bloom_filter.go
@@ -125,7 +125,12 @@ func (b *adaptiveBlockSplitBloomFilter) InsertHash(hash 
uint64) {
        }
 
        b.candidates = slices.DeleteFunc(b.candidates, func(c 
*bloomFilterCandidate) bool {
-               return c.expectedNDV < uint32(b.numDistinct) && c != 
b.largestCandidate
+               if c.expectedNDV < uint32(b.numDistinct) && c != 
b.largestCandidate {
+                       c.bloomFilter.cancelCleanup()
+                       c.bloomFilter.data.Release()
+                       return true
+               }
+               return false
        })
 
        for _, c := range b.candidates {
@@ -150,7 +155,12 @@ func (b *adaptiveBlockSplitBloomFilter) InsertBulk(hashes 
[]uint64) {
        b.numDistinct += int64(len(uniqueNewHashes))
 
        b.candidates = slices.DeleteFunc(b.candidates, func(c 
*bloomFilterCandidate) bool {
-               return c.expectedNDV < uint32(b.numDistinct) && c != 
b.largestCandidate
+               if c.expectedNDV < uint32(b.numDistinct) && c != 
b.largestCandidate {
+                       c.bloomFilter.cancelCleanup()
+                       c.bloomFilter.data.Release()
+                       return true
+               }
+               return false
        })
 
        for _, c := range b.candidates {
@@ -169,7 +179,17 @@ func (b *adaptiveBlockSplitBloomFilter) CheckHash(hash 
uint64) bool {
 func (b *adaptiveBlockSplitBloomFilter) WriteTo(w io.Writer, enc 
encryption.Encryptor) (int, error) {
        b.finalized = true
 
-       return b.optimalCandidate().bloomFilter.WriteTo(w, enc)
+       optimal := b.optimalCandidate()
+       n, err := optimal.bloomFilter.WriteTo(w, enc)
+
+       for _, c := range b.candidates {
+               c.bloomFilter.cancelCleanup()
+               c.bloomFilter.data.Release()
+       }
+       b.candidates = nil
+       b.largestCandidate = nil
+
+       return n, err
 }
 
 func (b *adaptiveBlockSplitBloomFilter) initCandidates(maxBytes uint32, 
numCandidates int, fpp float64) {
diff --git a/parquet/metadata/bloom_filter.go b/parquet/metadata/bloom_filter.go
index 13a4730c..b607c5f1 100644
--- a/parquet/metadata/bloom_filter.go
+++ b/parquet/metadata/bloom_filter.go
@@ -239,10 +239,11 @@ type blockSplitBloomFilter struct {
        data     *memory.Buffer
        bitset32 []uint32
 
-       hasher       Hasher
-       algorithm    format.BloomFilterAlgorithm
-       hashStrategy format.BloomFilterHash
-       compression  format.BloomFilterCompression
+       hasher        Hasher
+       algorithm     format.BloomFilterAlgorithm
+       hashStrategy  format.BloomFilterHash
+       compression   format.BloomFilterCompression
+       cancelCleanup func()
 }
 
 func (b *blockSplitBloomFilter) getAlg() *format.BloomFilterAlgorithm {
diff --git a/parquet/metadata/cleanup_bloom_filter.go 
b/parquet/metadata/cleanup_bloom_filter.go
index ed835c3b..d4dc7beb 100644
--- a/parquet/metadata/cleanup_bloom_filter.go
+++ b/parquet/metadata/cleanup_bloom_filter.go
@@ -26,7 +26,7 @@ import (
 )
 
 func addCleanup(bf *blockSplitBloomFilter, bufferPool *sync.Pool) {
-       runtime.AddCleanup(bf, func(data *memory.Buffer) {
+       c := runtime.AddCleanup(bf, func(data *memory.Buffer) {
                if bufferPool != nil {
                        data.ResizeNoShrink(0)
                        bufferPool.Put(data)
@@ -34,4 +34,5 @@ func addCleanup(bf *blockSplitBloomFilter, bufferPool 
*sync.Pool) {
                        data.Release()
                }
        }, bf.data)
+       bf.cancelCleanup = c.Stop
 }
diff --git a/parquet/metadata/cleanup_bloom_filter_go1.23.go 
b/parquet/metadata/cleanup_bloom_filter_go1.23.go
index b4bffbe7..b227c11a 100644
--- a/parquet/metadata/cleanup_bloom_filter_go1.23.go
+++ b/parquet/metadata/cleanup_bloom_filter_go1.23.go
@@ -32,4 +32,5 @@ func addCleanup(bf *blockSplitBloomFilter, bufferPool 
*sync.Pool) {
                        f.data.Release()
                }
        })
+       bf.cancelCleanup = func() { runtime.SetFinalizer(bf, nil) }
 }

Reply via email to