This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 28957523 fix(arrow/flight/session): fix flaky race condition test 
(#687)
28957523 is described below

commit 28957523c7aee34ff8169512ea23eee651a0712f
Author: Matt Topol <[email protected]>
AuthorDate: Thu Mar 5 11:15:01 2026 -0500

    fix(arrow/flight/session): fix flaky race condition test (#687)
    
    ### Rationale for this change
    Address crash caused by race condition exemplified by
    https://github.com/apache/arrow-go/actions/runs/22678156759/job/65740770847
    
    When a session is closed, the server removes it from the store and sends
    a trailer with `Max-Age=0` to tell the client to remove the cookie. If
    the client makes a new request before processing this trailer, it sends
    the stale cookie with the closed session id. Instead of returning
    `ErrNoSession` it got a `session not found` error.
    
    ### What changes are included in this PR?
    Catch the `session not found` error from the store and return
    `ErrNoSession` so that it properly creates a new session in this case.
    
    ### Are these changes tested?
    yes
    
    ### Are there any user-facing changes?
    no
---
 arrow/flight/client.go                   |  9 +++++++--
 arrow/flight/session/session.go          | 10 +++++++++-
 arrow/flight/session/stateful_session.go | 16 +++++++++++++++-
 3 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/arrow/flight/client.go b/arrow/flight/client.go
index 9e5da87c..96eb0e6b 100644
--- a/arrow/flight/client.go
+++ b/arrow/flight/client.go
@@ -152,8 +152,13 @@ func CreateClientMiddleware(middleware 
CustomClientMiddleware) ClientMiddleware
                                        post.CallCompleted(csCtx, err)
                                }
                                if isHdrs {
-                                       hdrmd, _ := cs.Header()
-                                       hdrs.HeadersReceived(csCtx, 
metadata.Join(hdrmd, cs.Trailer()))
+                                       // Only retrieve headers and trailers 
if the stream completed successfully
+                                       // or with io.EOF. If the stream was 
cancelled or had an error before
+                                       // completion, headers/trailers may not 
be available.
+                                       if err == nil || errors.Is(err, io.EOF) 
{
+                                               hdrmd, _ := cs.Header()
+                                               hdrs.HeadersReceived(csCtx, 
metadata.Join(hdrmd, cs.Trailer()))
+                                       }
                                }
                        }
                        go func() {
diff --git a/arrow/flight/session/session.go b/arrow/flight/session/session.go
index a2969d9a..202de38b 100644
--- a/arrow/flight/session/session.go
+++ b/arrow/flight/session/session.go
@@ -141,12 +141,16 @@ func (session *serverSession) EraseSessionOption(name 
string) {
 }
 
 func (session *serverSession) Close() error {
+       session.mu.Lock()
+       defer session.mu.Unlock()
        session.options = nil
        session.closed = true
        return nil
 }
 
 func (session *serverSession) Closed() bool {
+       session.mu.RLock()
+       defer session.mu.RUnlock()
        return session.closed
 }
 
@@ -221,7 +225,11 @@ func (middleware *serverSessionMiddleware) 
CallCompleted(ctx context.Context, _
        }
 
        if session.Closed() {
-               // Invalidate the client's cookie
+               // Invalidate the client's cookie by sending back the incoming 
cookie
+               // with MaxAge=-1. This follows the HTTP cookie protocol for 
deletion.
+               // Note: For stateless sessions, there's an inherent race 
condition where
+               // the client might send the old cookie again before processing 
this trailer.
+               // This is acceptable as the session data is client-controlled 
anyway.
                clientCookie.MaxAge = -1
                grpc.SetTrailer(ctx, metadata.Pairs("Set-Cookie", 
clientCookie.String()))
 
diff --git a/arrow/flight/session/stateful_session.go 
b/arrow/flight/session/stateful_session.go
index 322f60db..ac4e700b 100644
--- a/arrow/flight/session/stateful_session.go
+++ b/arrow/flight/session/stateful_session.go
@@ -170,7 +170,21 @@ func (manager *statefulServerSessionManager) 
GetSession(ctx context.Context) (Se
 
        sessionID, err := getSessionIDFromIncomingCookie(ctx)
        if err == nil {
-               return manager.store.Get(sessionID)
+               session, err := manager.store.Get(sessionID)
+               if err != nil {
+                       // If the session isn't in the store (e.g., it was 
closed/removed),
+                       // treat it as if no session exists so a new one can be 
created.
+                       // This handles the race condition where a client sends 
a stale
+                       // cookie before processing the session deletion 
trailer.
+                       return nil, ErrNoSession
+               }
+               // Also check if the session has been marked as closed but not 
yet removed.
+               // This handles the race between CallCompleted removing the 
session and
+               // StartCall looking it up from the cookie.
+               if session.Closed() {
+                       return nil, ErrNoSession
+               }
+               return session, nil
        }
        if err == http.ErrNoCookie {
                return nil, ErrNoSession

Reply via email to