This is an automated email from the ASF dual-hosted git repository.

xiazcy pushed a commit to branch go-http-fix
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit e8ac84d18445642f94b289628715c4a53206942b
Author: Yang Xia <[email protected]>
AuthorDate: Tue Mar 24 23:34:51 2026 -0700

    added wg to ensure in-flight goroutines complete & added response body 
drain before close to prevent TCP RST errors
---
 gremlin-go/driver/connection.go      | 13 ++++++++++++-
 gremlin-go/driver/connection_test.go | 24 ++++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go
index 5396b6ab43..54def5821a 100644
--- a/gremlin-go/driver/connection.go
+++ b/gremlin-go/driver/connection.go
@@ -29,6 +29,7 @@ import (
        "net"
        "net/http"
        "strings"
+       "sync"
        "time"
 )
 
@@ -53,6 +54,7 @@ type connection struct {
        logHandler   *logHandler
        serializer   *GraphBinarySerializer
        interceptors []RequestInterceptor
+       wg           sync.WaitGroup
 }
 
 // Connection pool defaults aligned with Java driver
@@ -121,7 +123,11 @@ func (c *connection) AddInterceptor(interceptor 
RequestInterceptor) {
 func (c *connection) submit(req *RequestMessage) (ResultSet, error) {
        rs := newChannelResultSet()
 
-       go c.executeAndStream(req, rs)
+       c.wg.Add(1)
+       go func() {
+               defer c.wg.Done()
+               c.executeAndStream(req, rs)
+       }()
 
        return rs, nil
 }
@@ -206,6 +212,10 @@ func (c *connection) executeAndStream(req *RequestMessage, 
rs ResultSet) {
                return
        }
        defer func() {
+               // Drain any unread bytes so the connection can be reused 
gracefully.
+               // Without this, Go's HTTP client sends a TCP RST instead of 
FIN,
+               // causing "Connection reset by peer" errors on the server.
+               io.Copy(io.Discard, resp.Body)
                if err := resp.Body.Close(); err != nil {
                        c.logHandler.logf(Debug, failedToCloseResponseBody, 
err.Error())
                }
@@ -328,5 +338,6 @@ func tryExtractJSONError(body string) string {
 }
 
 func (c *connection) close() {
+       c.wg.Wait()
        c.httpClient.CloseIdleConnections()
 }
diff --git a/gremlin-go/driver/connection_test.go 
b/gremlin-go/driver/connection_test.go
index 980040be5d..7d95192211 100644
--- a/gremlin-go/driver/connection_test.go
+++ b/gremlin-go/driver/connection_test.go
@@ -1093,6 +1093,30 @@ func TestConnectionWithMockServer(t *testing.T) {
                assert.Equal(t, graphBinaryMimeType, 
interceptorHeaders.Get("Content-Type"))
                assert.Equal(t, graphBinaryMimeType, 
interceptorHeaders.Get("Accept"))
        })
+
+       t.Run("close waits for in-flight requests", func(t *testing.T) {
+               server := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+                       time.Sleep(200 * time.Millisecond)
+                       w.WriteHeader(http.StatusOK)
+               }))
+               defer server.Close()
+
+               conn := newConnection(newTestLogHandler(), server.URL, 
&connectionSettings{})
+
+               rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", 
Fields: map[string]interface{}{}})
+               require.NoError(t, err)
+
+               start := time.Now()
+               conn.close()
+               elapsed := time.Since(start)
+
+               // close() should have waited for the in-flight goroutine
+               assert.GreaterOrEqual(t, elapsed.Milliseconds(), int64(150),
+                       "close() should wait for in-flight requests to 
complete")
+
+               // ResultSet should be closed (goroutine finished)
+               _, _ = rs.All()
+       })
 }
 
 // Tests for connection pool configuration settings

Reply via email to