This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new e47e04bcb feat(go/adbc/driver/flightsql): add logging for Flight SQL
(#4322)
e47e04bcb is described below
commit e47e04bcbc7d55b79a46cf440f815ee85f26617d
Author: davidhcoe <[email protected]>
AuthorDate: Sun Jun 7 18:14:22 2026 -0400
feat(go/adbc/driver/flightsql): add logging for Flight SQL (#4322)
- Adds Open Telemetry logging for Flight SQL
- Adds support for passing values as parameters to support TOML
integration
---------
Co-authored-by: David Coe <>
---
c/driver_manager/adbc_driver_manager.cc | 12 +-
go/adbc/adbc.go | 22 ++
.../driver/flightsql/flightsql_adbc_server_test.go | 18 +-
go/adbc/driver/flightsql/flightsql_bulk_ingest.go | 38 +-
go/adbc/driver/flightsql/flightsql_connection.go | 132 ++++++-
go/adbc/driver/flightsql/flightsql_database.go | 96 ++++-
go/adbc/driver/flightsql/flightsql_driver.go | 11 +-
go/adbc/driver/flightsql/flightsql_statement.go | 91 ++++-
go/adbc/driver/flightsql/logging.go | 406 ++++++++++++++++++++-
go/adbc/driver/flightsql/logging_test.go | 334 +++++++++++++++++
go/adbc/driver/flightsql/record_reader.go | 121 +++++-
go/adbc/driver/flightsql/record_reader_test.go | 80 +++-
go/adbc/driver/flightsql/utils.go | 29 +-
go/adbc/driver/internal/driverbase/database.go | 56 ++-
go/adbc/driver/internal/driverbase/driver_test.go | 2 +-
go/adbc/drivermgr/adbc_driver_manager.cc | 12 +-
go/adbc/ext.go | 7 -
go/adbc/go.mod | 1 +
go/adbc/go.sum | 12 +
19 files changed, 1389 insertions(+), 91 deletions(-)
diff --git a/c/driver_manager/adbc_driver_manager.cc
b/c/driver_manager/adbc_driver_manager.cc
index be20a9e3e..9ac56c83d 100644
--- a/c/driver_manager/adbc_driver_manager.cc
+++ b/c/driver_manager/adbc_driver_manager.cc
@@ -16,11 +16,19 @@
// under the License.
#if defined(_WIN32)
-#include <windows.h> // Must come first
-
+// These version macros gate which Win32 APIs the SDK headers declare. They
MUST
+// be set before <windows.h> is included -- once windows.h pulls in winnt.h,
the
+// internal API-availability macros are fixed and later #defines have no
effect.
+// In particular, SHGetKnownFolderPath (used below) requires _WIN32_WINNT >=
0x0600
+// (Vista). Without this, builds with toolchains that default _WIN32_WINNT
below
+// Vista (e.g. TDM-GCC 10.x) fail with "SHGetKnownFolderPath was not declared".
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0A00 // Windows 10
+#endif
#ifndef NTDDI_VERSION
#define NTDDI_VERSION 0x0A00000C // For SHGetKnownFolderPath in ShlObj_core.h
in ShlObj.h
#endif
+#include <windows.h> // Must come first
#include <KnownFolders.h>
#include <ShlObj.h>
diff --git a/go/adbc/adbc.go b/go/adbc/adbc.go
index d66e143a3..7e85352b8 100644
--- a/go/adbc/adbc.go
+++ b/go/adbc/adbc.go
@@ -264,6 +264,28 @@ const (
OptionKeyPassword = "password"
// EXPERIMENTAL. Sets/Gets the trace parent on OpenTelemetry traces
OptionKeyTelemetryTraceParent = "adbc.telemetry.trace_parent"
+ // EXPERIMENTAL. Selects the OpenTelemetry traces exporter when the
+ // driver initializes its tracer provider. Accepts the same values as
+ // the OpenTelemetry "OTEL_TRACES_EXPORTER" environment variable (see
+ // the OptionTelemetryExporter constants below: "none", "otlp",
+ // "console", "adbcfile"). When this option is set on a database it
+ // takes precedence over the OTEL_TRACES_EXPORTER environment
+ // variable, which lets operators select an exporter via the ADBC
+ // driver manager / TOML profile mechanism without having to mutate
+ // the host process's environment. When neither this option nor the
+ // environment variable is set, the driver falls back to the
+ // process-global OpenTelemetry tracer provider.
+ OptionKeyTelemetryTracesExporter = "adbc.telemetry.traces_exporter"
+ // EXPERIMENTAL. Selects the on-disk folder used by the "adbcfile"
+ // traces exporter. When the exporter is "adbcfile" and this option
+ // is set, rotated trace files are written to the supplied folder
+ // (which is created if it does not exist) instead of the default
+ // "<user-config-dir>/.adbc/traces" path. The option is ignored for
+ // other exporters; it exists so an operator can route trace files
+ // to a location their support workflow already collects (e.g. a
+ // shared diagnostics folder) via the ADBC driver-manager / TOML
+ // profile mechanism.
+ OptionKeyTelemetryTracesFolderPath = "adbc.telemetry.traces_folder_path"
)
// EXPERIMENTAL. Traces Telemetry exporter option type
diff --git a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
index c10b38c88..57588a4f9 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
@@ -768,7 +768,14 @@ func (ts *ErrorDetailsTests) TestGetFlightInfo() {
ts.Equal(1, len(adbcErr.Details))
- wrapper := adbcErr.Details[0]
+ var wrapper adbc.ErrorDetail
+ for _, d := range adbcErr.Details {
+ if d.Key() == "grpc-status-details-bin" {
+ wrapper = d
+ break
+ }
+ }
+ ts.NotNil(wrapper, "grpc-status-details-bin detail not found")
ts.Equal("grpc-status-details-bin", wrapper.Key())
raw, err := wrapper.Serialize()
@@ -803,7 +810,14 @@ func (ts *ErrorDetailsTests) TestDoGet() {
ts.Equal(1, len(adbcErr.Details))
- wrapper := adbcErr.Details[0]
+ var wrapper adbc.ErrorDetail
+ for _, d := range adbcErr.Details {
+ if d.Key() == "grpc-status-details-bin" {
+ wrapper = d
+ break
+ }
+ }
+ ts.NotNil(wrapper, "grpc-status-details-bin detail not found")
ts.Equal("grpc-status-details-bin", wrapper.Key())
raw, err := wrapper.Serialize()
diff --git a/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
b/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
index 9ae22ba84..fea5604d5 100644
--- a/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
+++ b/go/adbc/driver/flightsql/flightsql_bulk_ingest.go
@@ -20,6 +20,8 @@ package flightsql
import (
"context"
"fmt"
+ "log/slog"
+ "time"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-go/v18/arrow"
@@ -111,6 +113,26 @@ func (s *statement) executeIngest(ctx context.Context)
(int64, error) {
}
}
+ startTime := time.Now()
+ catalogStr := ""
+ if s.catalog != nil {
+ catalogStr = *s.catalog
+ }
+ dbSchemaStr := ""
+ if s.dbSchema != nil {
+ dbSchemaStr = *s.dbSchema
+ }
+ startAttrs := []any{
+ slog.String("target_table", s.targetTable),
+ slog.String("mode", s.ingestMode),
+ slog.String("catalog", catalogStr),
+ slog.String("db_schema", dbSchemaStr),
+ slog.Bool("temporary", s.temporary),
+ slog.Bool("streamBind", s.streamBind != nil),
+ slog.Bool("recordBound", s.bound != nil),
+ }
+ s.log.InfoContext(ctx, "FlightSQL ExecuteIngest start", startAttrs...)
+
opts := ingestOptions{
targetTable: s.targetTable,
mode: s.ingestMode,
@@ -129,6 +151,10 @@ func (s *statement) executeIngest(ctx context.Context)
(int64, error) {
} else {
rdr, err = createRecordReaderFromBatch(s.bound)
if err != nil {
+ s.log.WarnContext(ctx, "FlightSQL ExecuteIngest
finished with error",
+ slog.Duration("duration",
time.Since(startTime)),
+ "err", err,
+ )
return -1, err
}
}
@@ -138,9 +164,19 @@ func (s *statement) executeIngest(ctx context.Context)
(int64, error) {
callOpts := append([]grpc.CallOption{}, grpc.Header(&header),
grpc.Trailer(&trailer), s.timeouts)
nRows, err := s.cnxn.cl.ExecuteIngest(ctx, rdr, ingestOpts, callOpts...)
+ finishAttrs := []any{
+ slog.Duration("duration", time.Since(startTime)),
+ slog.Int64("rowsIngested", nRows),
+ }
+ finishAttrs = append(finishAttrs, correlationHeaderAttrs(header)...)
+ finishAttrs = append(finishAttrs, correlationHeaderAttrs(trailer)...)
if err != nil {
- return -1, adbcFromFlightStatusWithDetails(err, header,
trailer, "ExecuteIngest")
+ wrapped := adbcFromFlightStatusWithDetails(err, header,
trailer, "ExecuteIngest")
+ finishAttrs = append(finishAttrs, "err", wrapped)
+ s.log.WarnContext(ctx, "FlightSQL ExecuteIngest finished with
error", finishAttrs...)
+ return -1, wrapped
}
+ s.log.InfoContext(ctx, "FlightSQL ExecuteIngest finished",
finishAttrs...)
return nRows, nil
}
diff --git a/go/adbc/driver/flightsql/flightsql_connection.go
b/go/adbc/driver/flightsql/flightsql_connection.go
index 4f672d298..11d9a6473 100644
--- a/go/adbc/driver/flightsql/flightsql_connection.go
+++ b/go/adbc/driver/flightsql/flightsql_connection.go
@@ -23,8 +23,10 @@ import (
"encoding/json"
"fmt"
"io"
+ "log/slog"
"math"
"strings"
+ "time"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/driver/internal"
@@ -55,6 +57,14 @@ type connectionImpl struct {
timeouts timeoutOption
txn *flightsql.Txn
supportInfo support
+
+ // id is a short random identifier assigned at Open time and stamped
+ // onto every log record emitted by this connection.
+ id string
+
+ // openedAt is the wall-clock time at which Open() finished; used to
+ // log connection lifetime at Close.
+ openedAt time.Time
}
type flightSqlMetadata struct {
@@ -222,14 +232,29 @@ var adbcToFlightSQLInfo =
map[adbc.InfoCode]flightsql.SqlInfo{
adbc.InfoVendorSubstraitMaxVersion:
flightsql.SqlInfoFlightSqlServerSubstraitMaxVersion,
}
-func doGet(ctx context.Context, cl *flightsql.Client, endpoint
*flight.FlightEndpoint, clientCache gcache.Cache, opts ...grpc.CallOption) (rdr
*flight.Reader, err error) {
+// doGetWithLogger performs DoGet against an endpoint's locations, logging each
+// attempt and joining all per-location failures into the returned error so the
+// caller can see every location that was tried. logger may be nil.
+func doGetWithLogger(ctx context.Context, cl *flightsql.Client, endpoint
*flight.FlightEndpoint, clientCache gcache.Cache, logger *slog.Logger, opts
...grpc.CallOption) (rdr *flight.Reader, err error) {
+ log := safeLogger(logger)
if len(endpoint.Location) == 0 {
- return cl.DoGet(ctx, endpoint.Ticket, opts...)
+ log.DebugContext(ctx, "FlightSQL doGet",
+ "phase", "noLocations",
+ )
+ start := time.Now()
+ rdr, err = cl.DoGet(ctx, endpoint.Ticket, opts...)
+ log.DebugContext(ctx, "FlightSQL doGet",
+ "phase", "defaultClientResult",
+ "duration", time.Since(start),
+ "err", err,
+ )
+ return rdr, err
}
var (
- cc interface{}
- hasFallback bool
+ cc interface{}
+ hasFallback bool
+ attemptErrors []string
)
for _, loc := range endpoint.Location {
@@ -238,22 +263,59 @@ func doGet(ctx context.Context, cl *flightsql.Client,
endpoint *flight.FlightEnd
continue
}
+ start := time.Now()
cc, err = clientCache.Get(loc.Uri)
if err != nil {
+ attemptErrors = append(attemptErrors,
fmt.Sprintf("clientCache.Get(%q): %s", loc.Uri, err.Error()))
+ log.WarnContext(ctx, "FlightSQL doGet location attempt
failed",
+ "phase", "clientCacheGet",
+ "location", loc.Uri,
+ "duration", time.Since(start),
+ "err", err,
+ )
continue
}
conn := cc.(*flightsql.Client)
rdr, err = conn.DoGet(ctx, endpoint.Ticket, opts...)
if err != nil {
+ attemptErrors = append(attemptErrors,
fmt.Sprintf("DoGet(%q): %s", loc.Uri, err.Error()))
+ log.WarnContext(ctx, "FlightSQL doGet location attempt
failed",
+ "phase", "doGet",
+ "location", loc.Uri,
+ "duration", time.Since(start),
+ "err", err,
+ )
continue
}
+ log.DebugContext(ctx, "FlightSQL doGet succeeded",
+ "location", loc.Uri,
+ "duration", time.Since(start),
+ )
return
}
if hasFallback {
- return cl.DoGet(ctx, endpoint.Ticket, opts...)
+ start := time.Now()
+ rdr, err = cl.DoGet(ctx, endpoint.Ticket, opts...)
+ if err != nil {
+ attemptErrors = append(attemptErrors,
fmt.Sprintf("DoGet(fallback to default client): %s", err.Error()))
+ log.WarnContext(ctx, "FlightSQL doGet fallback to
default client failed",
+ "duration", time.Since(start),
+ "err", err,
+ )
+ return nil, fmt.Errorf("all DoGet attempts failed: %s;
final: %w", strings.Join(attemptErrors, "; "), err)
+ }
+ log.DebugContext(ctx, "FlightSQL doGet succeeded via default
client fallback",
+ "duration", time.Since(start),
+ )
+ return rdr, nil
+ }
+
+ if err != nil && len(attemptErrors) > 1 {
+ err = fmt.Errorf("all %d DoGet location(s) failed: %s; final:
%w",
+ len(attemptErrors), strings.Join(attemptErrors, "; "),
err)
}
return nil, err
@@ -642,7 +704,7 @@ func (c *connectionImpl) PrepareDriverInfo(ctx
context.Context, infoCodes []adbc
// No error, go get the SqlInfo from the server
for i, endpoint := range info.Endpoint {
var header, trailer metadata.MD
- rdr, err := doGet(ctx, c.cl, endpoint, c.clientCache,
grpc.Header(&header), grpc.Trailer(&trailer), c.timeouts)
+ rdr, err := doGetWithLogger(ctx, c.cl, endpoint, c.clientCache,
c.Logger, grpc.Header(&header), grpc.Trailer(&trailer), c.timeouts)
if err != nil {
return adbcFromFlightStatusWithDetails(err, header,
trailer, "GetInfo(DoGet): endpoint %d: %s", i, endpoint.Location)
}
@@ -701,7 +763,14 @@ func (c *connectionImpl) PrepareDriverInfo(ctx
context.Context, infoCodes []adbc
// Helper function to read and validate a metadata stream
func (c *connectionImpl) readInfo(ctx context.Context, expectedSchema
*arrow.Schema, info *flight.FlightInfo, opts ...grpc.CallOption)
(array.RecordReader, error) {
// use a default queueSize for the reader
- rdr, err := newRecordReader(ctx, c.db.Alloc, c.cl, info, c.clientCache,
5, opts...)
+ rdr, err := newRecordReader(ctx, recordReaderConfig{
+ alloc: c.db.Alloc,
+ cl: c.cl,
+ info: info,
+ clientCache: c.clientCache,
+ bufferSize: 5,
+ logger: c.Logger,
+ }, opts...)
if err != nil {
return nil, adbcFromFlightStatus(err, "DoGet")
}
@@ -898,7 +967,7 @@ func (c *connectionImpl) GetTableSchema(ctx
context.Context, catalog *string, db
header = metadata.MD{}
trailer = metadata.MD{}
- rdr, err := doGet(ctx, c.cl, info.Endpoint[0], c.clientCache,
c.timeouts, grpc.Header(&header), grpc.Trailer(&trailer))
+ rdr, err := doGetWithLogger(ctx, c.cl, info.Endpoint[0], c.clientCache,
c.Logger, c.timeouts, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
return nil, adbcFromFlightStatusWithDetails(err, header,
trailer, "GetTableSchema(DoGet)")
}
@@ -968,7 +1037,14 @@ func (c *connectionImpl) GetTableTypes(ctx
context.Context) (array.RecordReader,
return nil, adbcFromFlightStatusWithDetails(err, header,
trailer, "GetTableTypes")
}
- return newRecordReader(ctx, c.db.Alloc, c.cl, info, c.clientCache, 5)
+ return newRecordReader(ctx, recordReaderConfig{
+ alloc: c.db.Alloc,
+ cl: c.cl,
+ info: info,
+ clientCache: c.clientCache,
+ bufferSize: 5,
+ logger: c.Logger,
+ })
}
// Commit commits any pending transactions on this connection, it should
@@ -1019,6 +1095,13 @@ func (c *connectionImpl) Rollback(ctx context.Context)
error {
// NewStatement initializes a new statement object tied to this connection
func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+ id := newRandomID("stmt")
+ // Build a statement-scoped logger so every record emitted for this
+ // statement carries both connection_id (inherited from c.Logger via the
+ // With() called in databaseImpl.Open) and statement_id. The discard
+ // fallback in safeLogger keeps callers free of nil-checks if no logger
+ // is wired up by the host.
+ log := safeLogger(c.Logger).With("statement_id", id)
return &statement{
alloc: c.db.Alloc,
clientCache: c.clientCache,
@@ -1026,6 +1109,8 @@ func (c *connectionImpl) NewStatement() (adbc.Statement,
error) {
queueSize: 5,
timeouts: c.timeouts,
cnxn: c,
+ id: id,
+ log: log,
}, nil
}
@@ -1118,6 +1203,17 @@ func (c *connectionImpl) Close() error {
}
}
+ closeStart := time.Now()
+ // Snapshot fields before tearing down c.cl; log "closing" and
+ // "closed" separately so a hung CloseSession is still visible.
+ logger := safeLogger(c.Logger)
+ connID := c.id
+ openedAt := c.openedAt
+
+ logger.Info("FlightSQL connection closing",
+ "connection_id", connID,
+ )
+
ctx := metadata.NewOutgoingContext(context.Background(), c.hdrs)
var header, trailer metadata.MD
_, err := c.cl.CloseSession(ctx, &flight.CloseSessionRequest{},
grpc.Header(&header), grpc.Trailer(&trailer), c.timeouts)
@@ -1134,6 +1230,22 @@ func (c *connectionImpl) Close() error {
c.clientCache.Purge()
err = c.cl.Close()
c.cl = nil
+
+ args := []any{
+ "connection_id", connID,
+ "close_duration", time.Since(closeStart),
+ }
+ if !openedAt.IsZero() {
+ args = append(args, "lifetime", time.Since(openedAt))
+ }
+ if err != nil {
+ args = append(args, "err", err)
+ args = append(args, grpcStatusAttrs(err)...)
+ logger.Info("FlightSQL connection closed with error", args...)
+ } else {
+ logger.Info("FlightSQL connection closed", args...)
+ }
+
return adbcFromFlightStatus(err, "Close")
}
@@ -1159,7 +1271,7 @@ func (c *connectionImpl) ReadPartition(ctx
context.Context, serializedPartition
}
ctx = metadata.NewOutgoingContext(ctx, c.hdrs)
- rdr, err = doGet(ctx, c.cl, info.Endpoint[0], c.clientCache, c.timeouts)
+ rdr, err = doGetWithLogger(ctx, c.cl, info.Endpoint[0], c.clientCache,
c.Logger, c.timeouts)
if err != nil {
return nil, adbcFromFlightStatus(err, "ReadPartition(DoGet)")
}
diff --git a/go/adbc/driver/flightsql/flightsql_database.go
b/go/adbc/driver/flightsql/flightsql_database.go
index 316ad778b..259e534ed 100644
--- a/go/adbc/driver/flightsql/flightsql_database.go
+++ b/go/adbc/driver/flightsql/flightsql_database.go
@@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
+ "log/slog"
"net/url"
"strconv"
"strings"
@@ -367,6 +368,11 @@ func (d *databaseImpl) SetOptionDouble(key string, value
float64) error {
}
func (d *databaseImpl) Close() error {
+ if d.Logger != nil {
+ d.Logger.Info("FlightSQL database closed",
+ "target", d.uri.String(),
+ )
+ }
return nil
}
@@ -430,15 +436,37 @@ func getFlightClient(ctx context.Context, loc string, d
*databaseImpl, authMiddl
var authValue string
if d.user != "" || d.pass != "" {
+ authStart := time.Now()
+ d.Logger.InfoContext(ctx, "FlightSQL basic auth started",
+ "target", loc,
+ "user", d.user,
+ )
var header, trailer metadata.MD
ctx, err = cl.Client.AuthenticateBasicToken(ctx, d.user,
d.pass, grpc.Header(&header), grpc.Trailer(&trailer), d.timeout)
if err != nil {
+ args := []any{
+ "target", loc,
+ "user", d.user,
+ "duration", time.Since(authStart),
+ "err", err,
+ }
+ args = append(args, correlationHeaderAttrs(header)...)
+ args = append(args, correlationHeaderAttrs(trailer)...)
+ args = append(args, grpcStatusAttrs(err)...)
+ d.Logger.InfoContext(ctx, "FlightSQL basic auth
failed", args...)
return nil, adbcFromFlightStatusWithDetails(err,
header, trailer, "AuthenticateBasicToken")
}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
authValue = md.Get("Authorization")[0]
}
+
+ d.Logger.InfoContext(ctx, "FlightSQL basic auth succeeded",
+ "target", loc,
+ "user", d.user,
+ "duration", time.Since(authStart),
+ "token_length", len(authValue),
+ )
}
if authValue != "" {
@@ -453,7 +481,7 @@ type support struct {
}
func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) {
- authMiddle := &bearerAuthMiddleware{hdrs: d.hdrs.Copy()}
+ authMiddle := &bearerAuthMiddleware{hdrs: d.hdrs.Copy(), logger:
safeLogger(d.Logger)}
var cookies flight.CookieMiddleware
if d.enableCookies {
cookies = flight.NewCookieMiddleware()
@@ -480,7 +508,7 @@ func (d *databaseImpl) Open(ctx context.Context)
(adbc.Connection, error) {
}
// use the existing auth token if there is one
cl, err := getFlightClient(context.Background(), uri, d,
- &bearerAuthMiddleware{hdrs:
authMiddle.hdrs.Copy()}, cookieMiddleware)
+ &bearerAuthMiddleware{hdrs:
authMiddle.hdrs.Copy(), logger: safeLogger(d.Logger)}, cookieMiddleware)
if err != nil {
return nil, err
}
@@ -510,7 +538,7 @@ func (d *databaseImpl) Open(ctx context.Context)
(adbc.Connection, error) {
const int32code = 3
for _, endpoint := range info.Endpoint {
- rdr, err := doGet(ctx, cl, endpoint, cache, d.timeout)
+ rdr, err := doGetWithLogger(ctx, cl, endpoint, cache,
d.Logger, d.timeout)
if err != nil {
continue
}
@@ -549,6 +577,16 @@ func (d *databaseImpl) Open(ctx context.Context)
(adbc.Connection, error) {
hdrs: make(metadata.MD), timeouts: d.timeout, supportInfo:
cnxnSupport,
ConnectionImplBase:
driverbase.NewConnectionImplBase(&d.DatabaseImplBase),
}
+ // Stamp a stable per-connection ID onto every log line emitted by
+ // this connection (and any statements derived from it).
+ conn.id = newRandomID("conn")
+ conn.openedAt = time.Now()
+ conn.Logger = safeLogger(conn.Logger).With("connection_id", conn.id)
+ conn.Logger.InfoContext(ctx, "FlightSQL connection opened",
+ "target", d.uri.String(),
+ "transactionsSupported", cnxnSupport.transactions,
+ "driver", infoDriverName,
+ )
return driverbase.NewConnectionBuilder(conn).
WithDriverInfoPreparer(conn).
@@ -560,6 +598,9 @@ func (d *databaseImpl) Open(ctx context.Context)
(adbc.Connection, error) {
type bearerAuthMiddleware struct {
mutex sync.RWMutex
hdrs metadata.MD
+ // logger, when non-nil, receives an Info event each time the bearer
+ // token is rotated. Only token lengths are logged, never values.
+ logger *slog.Logger
}
func (b *bearerAuthMiddleware) StartCall(ctx context.Context) context.Context {
@@ -569,18 +610,51 @@ func (b *bearerAuthMiddleware) StartCall(ctx
context.Context) context.Context {
return metadata.NewOutgoingContext(ctx, metadata.Join(md, b.hdrs))
}
+// rotateAuth atomically replaces the stored Authorization metadata and
+// returns the previous value plus the current logger. Callers invoke
+// the logger outside the critical section.
+func (b *bearerAuthMiddleware) rotateAuth(headers ...string) (previous
[]string, logger *slog.Logger) {
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+ previous = b.hdrs.Get("authorization")
+ b.hdrs.Set("authorization", headers...)
+ return previous, b.logger
+}
+
func (b *bearerAuthMiddleware) HeadersReceived(ctx context.Context, md
metadata.MD) {
// apache/arrow-adbc#584
headers := md.Get("authorization")
- if len(headers) > 0 {
- b.mutex.Lock()
- defer b.mutex.Unlock()
- b.hdrs.Set("authorization", headers...)
- }
+ if len(headers) == 0 {
+ return
+ }
+ previous, logger := b.rotateAuth(headers...)
+ if logger == nil {
+ return
+ }
+ // Log lengths, never values, so credentials never reach the log path.
+ var prevLen int
+ if len(previous) > 0 {
+ prevLen = len(previous[0])
+ }
+ logger.InfoContext(ctx, "FlightSQL bearer token rotated by server",
+ "previous_token_length", prevLen,
+ "new_token_length", len(headers[0]),
+ "source", "HeadersReceived",
+ )
}
func (b *bearerAuthMiddleware) SetHeader(authValue string) {
- b.mutex.Lock()
- defer b.mutex.Unlock()
- b.hdrs.Set("authorization", authValue)
+ previous, logger := b.rotateAuth(authValue)
+ if logger == nil {
+ return
+ }
+ var prevLen int
+ if len(previous) > 0 {
+ prevLen = len(previous[0])
+ }
+ logger.Info("FlightSQL bearer token rotated by client",
+ "previous_token_length", prevLen,
+ "new_token_length", len(authValue),
+ "source", "SetHeader",
+ )
}
diff --git a/go/adbc/driver/flightsql/flightsql_driver.go
b/go/adbc/driver/flightsql/flightsql_driver.go
index 616a1ab09..4af7ff23e 100644
--- a/go/adbc/driver/flightsql/flightsql_driver.go
+++ b/go/adbc/driver/flightsql/flightsql_driver.go
@@ -131,7 +131,16 @@ func (d *driverImpl) NewDatabaseWithOptionsContext(ctx
context.Context, opts map
}
delete(opts, adbc.OptionKeyURI)
- dbBase, err := driverbase.NewDatabaseImplBase(ctx, &d.DriverImplBase)
+ tracesExporter := opts[adbc.OptionKeyTelemetryTracesExporter]
+ delete(opts, adbc.OptionKeyTelemetryTracesExporter)
+
+ tracesFolderPath := opts[adbc.OptionKeyTelemetryTracesFolderPath]
+ delete(opts, adbc.OptionKeyTelemetryTracesFolderPath)
+
+ dbBase, err := driverbase.NewDatabaseImplBase(ctx, &d.DriverImplBase,
driverbase.TracingOptions{
+ ExporterName: tracesExporter,
+ TracingFolderPath: tracesFolderPath,
+ })
if err != nil {
return nil, err
}
diff --git a/go/adbc/driver/flightsql/flightsql_statement.go
b/go/adbc/driver/flightsql/flightsql_statement.go
index 08d92c44b..61705ae6f 100644
--- a/go/adbc/driver/flightsql/flightsql_statement.go
+++ b/go/adbc/driver/flightsql/flightsql_statement.go
@@ -20,6 +20,7 @@ package flightsql
import (
"context"
"fmt"
+ "log/slog"
"math"
"strconv"
"strings"
@@ -179,6 +180,8 @@ type statement struct {
// Bound data for bulk ingest
bound arrow.RecordBatch
streamBind array.RecordReader
+ id string
+ log *slog.Logger
}
func (s *statement) closePreparedStatement() error {
@@ -490,9 +493,25 @@ func (s *statement) SetSqlQuery(query string) error {
}
s.targetTable = ""
s.query.setSqlQuery(query)
+ if s.log != nil {
+ s.log.Debug("FlightSQL SetSqlQuery", s.queryAttrs()...)
+ }
return nil
}
+func (s *statement) queryAttrs() []any {
+ if s.query.sqlQuery != "" {
+ return queryFingerprintAttrs(s.query.sqlQuery)
+ }
+ if s.query.substraitPlan != nil {
+ return substraitFingerprintAttrs(s.query.substraitPlan,
s.query.substraitVersion)
+ }
+ if s.targetTable != "" {
+ return []any{slog.String("query_type", "ingest"),
slog.String("target_table", s.targetTable)}
+ }
+ return []any{slog.String("query_type", "none")}
+}
+
// ExecuteQuery executes the current query or prepared statement
// and returns a RecordReader for the results along with the number
// of rows affected if known, otherwise it will be -1.
@@ -517,6 +536,13 @@ func (s *statement) ExecuteQuery(ctx context.Context) (rdr
array.RecordReader, n
return nil, nrec, err
}
+ startTime := time.Now()
+ startAttrs := append([]any{
+ slog.Bool("prepared", s.prepared != nil),
+ slog.Bool("hasTxn", s.cnxn.txn != nil),
+ }, s.queryAttrs()...)
+ s.log.InfoContext(ctx, "FlightSQL ExecuteQuery start", startAttrs...)
+
ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
var info *flight.FlightInfo
var header, trailer metadata.MD
@@ -527,12 +553,37 @@ func (s *statement) ExecuteQuery(ctx context.Context)
(rdr array.RecordReader, n
info, err = s.query.execute(ctx, s.cnxn, opts...)
}
+ defer func() {
+ finishAttrs := []any{
+ slog.Duration("duration", time.Since(startTime)),
+ slog.String("phase", "GetFlightInfo"),
+ }
+ if info != nil {
+ finishAttrs = append(finishAttrs,
flightInfoLogAttrs(info)...)
+ }
+ finishAttrs = append(finishAttrs,
correlationHeaderAttrs(header)...)
+ finishAttrs = append(finishAttrs,
correlationHeaderAttrs(trailer)...)
+ if err != nil {
+ finishAttrs = append(finishAttrs, "err", err)
+ s.log.WarnContext(ctx, "FlightSQL ExecuteQuery finished
with error", finishAttrs...)
+ } else {
+ s.log.InfoContext(ctx, "FlightSQL ExecuteQuery
finished", finishAttrs...)
+ }
+ }()
+
if err != nil {
return nil, -1, adbcFromFlightStatusWithDetails(err, header,
trailer, "ExecuteQuery")
}
nrec = info.TotalRecords
- rdr, err = newRecordReader(ctx, s.alloc, s.cnxn.cl, info,
s.clientCache, s.queueSize, s.timeouts)
+ rdr, err = newRecordReader(ctx, recordReaderConfig{
+ alloc: s.alloc,
+ cl: s.cnxn.cl,
+ info: info,
+ clientCache: s.clientCache,
+ bufferSize: s.queueSize,
+ logger: s.log,
+ }, s.timeouts)
return
}
@@ -556,6 +607,13 @@ func (s *statement) ExecuteUpdate(ctx context.Context) (n
int64, err error) {
return s.executeIngest(ctx)
}
+ startTime := time.Now()
+ startAttrs := append([]any{
+ slog.Bool("prepared", s.prepared != nil),
+ slog.Bool("hasTxn", s.cnxn.txn != nil),
+ }, s.queryAttrs()...)
+ s.log.InfoContext(ctx, "FlightSQL ExecuteUpdate start", startAttrs...)
+
ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
var header, trailer metadata.MD
opts := append([]grpc.CallOption{}, grpc.Header(&header),
grpc.Trailer(&trailer), s.timeouts)
@@ -565,6 +623,21 @@ func (s *statement) ExecuteUpdate(ctx context.Context) (n
int64, err error) {
n, err = s.query.executeUpdate(ctx, s.cnxn, opts...)
}
+ defer func() {
+ finishAttrs := []any{
+ slog.Duration("duration", time.Since(startTime)),
+ slog.Int64("rowsAffected", n),
+ }
+ finishAttrs = append(finishAttrs,
correlationHeaderAttrs(header)...)
+ finishAttrs = append(finishAttrs,
correlationHeaderAttrs(trailer)...)
+ if err != nil {
+ finishAttrs = append(finishAttrs, "err", err)
+ s.log.WarnContext(ctx, "FlightSQL ExecuteUpdate
finished with error", finishAttrs...)
+ } else {
+ s.log.InfoContext(ctx, "FlightSQL ExecuteUpdate
finished", finishAttrs...)
+ }
+ }()
+
if err != nil {
err = adbcFromFlightStatusWithDetails(err, header, trailer,
"ExecuteQuery")
}
@@ -575,9 +648,25 @@ func (s *statement) ExecuteUpdate(ctx context.Context) (n
int64, err error) {
// Prepare turns this statement into a prepared statement to be executed
// multiple times. This invalidates any prior result sets.
func (s *statement) Prepare(ctx context.Context) error {
+ startTime := time.Now()
+ s.log.InfoContext(ctx, "FlightSQL Prepare start", s.queryAttrs()...)
+
ctx = metadata.NewOutgoingContext(ctx, s.hdrs)
var header, trailer metadata.MD
prep, err := s.query.prepare(ctx, s.cnxn, grpc.Header(&header),
grpc.Trailer(&trailer), s.timeouts)
+
+ defer func() {
+ finishAttrs := []any{slog.Duration("duration",
time.Since(startTime))}
+ finishAttrs = append(finishAttrs,
correlationHeaderAttrs(header)...)
+ finishAttrs = append(finishAttrs,
correlationHeaderAttrs(trailer)...)
+ if err != nil {
+ finishAttrs = append(finishAttrs, "err", err)
+ s.log.WarnContext(ctx, "FlightSQL Prepare finished with
error", finishAttrs...)
+ } else {
+ s.log.InfoContext(ctx, "FlightSQL Prepare finished",
finishAttrs...)
+ }
+ }()
+
if err != nil {
return adbcFromFlightStatusWithDetails(err, header, trailer,
"Prepare")
}
diff --git a/go/adbc/driver/flightsql/logging.go
b/go/adbc/driver/flightsql/logging.go
index 4fb12c411..9dae400b8 100644
--- a/go/adbc/driver/flightsql/logging.go
+++ b/go/adbc/driver/flightsql/logging.go
@@ -19,16 +19,130 @@ package flightsql
import (
"context"
+ "crypto/rand"
+ "crypto/sha256"
+ "encoding/hex"
"io"
"log/slog"
+ "strconv"
"time"
+ "github.com/apache/arrow-go/v18/arrow/flight"
+ "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
)
+// safeLogger returns a non-nil *slog.Logger wrapped with otelTraceHandler
+// so records carry trace/span IDs when their context has an active span.
+// A nil logger becomes a discard logger; the wrap is idempotent.
+func safeLogger(logger *slog.Logger) *slog.Logger {
+ if logger == nil {
+ logger = slog.New(slog.NewTextHandler(io.Discard, nil))
+ }
+ return withOtelTraceContext(logger)
+}
+
+// maxLoggedBlobBytes caps how many bytes of opaque server-defined blobs
+// (descriptor commands, AppMetadata) are emitted in log records. Flight
+// tickets are not logged at all because they may carry sensitive data.
+const maxLoggedBlobBytes = 32
+
+// endpointLogAttrs builds slog attributes describing a Flight endpoint
+// (index, ticket length, locations) for per-endpoint log records. Ticket
+// contents are intentionally never logged.
+func endpointLogAttrs(endpointIndex, numEndpoints int, endpoint
*flight.FlightEndpoint) []any {
+ attrs := []any{
+ slog.Int("endpointIndex", endpointIndex),
+ slog.Int("numEndpoints", numEndpoints),
+ }
+ if endpoint == nil {
+ return attrs
+ }
+ if endpoint.Ticket != nil {
+ attrs = append(attrs, slog.Int("ticketBytes",
len(endpoint.Ticket.Ticket)))
+ }
+ if len(endpoint.Location) == 0 {
+ attrs = append(attrs, slog.String("locations", "<empty: using
default client connection>"))
+ } else {
+ uris := make([]string, 0, len(endpoint.Location))
+ for _, loc := range endpoint.Location {
+ uris = append(uris, loc.Uri)
+ }
+ attrs = append(attrs, slog.Any("locations", uris))
+ }
+ if endpoint.ExpirationTime != nil {
+ attrs = append(attrs, slog.Time("expirationTime",
endpoint.ExpirationTime.AsTime()))
+ }
+ return attrs
+}
+
+// streamProgress tracks per-endpoint streaming statistics for log records
+// and error messages emitted when a stream ends. Not safe for concurrent
+// use; intended to be owned by the goroutine driving one endpoint.
+type streamProgress struct {
+ start time.Time
+ firstBatchAt time.Time
+ lastBatchAt time.Time
+ batchesRead int64
+ recordsRead int64
+ bytesEstimate int64
+}
+
+func newStreamProgress() *streamProgress {
+ return &streamProgress{start: time.Now()}
+}
+
+// recordBatch updates the tracker after one Arrow record batch was received.
+func (p *streamProgress) recordBatch(rows int64, bytes int64) {
+ now := time.Now()
+ if p.batchesRead == 0 {
+ p.firstBatchAt = now
+ }
+ p.lastBatchAt = now
+ p.batchesRead++
+ p.recordsRead += rows
+ p.bytesEstimate += bytes
+}
+
+// logAttrs returns slog attributes summarizing this stream's progress.
+func (p *streamProgress) logAttrs() []any {
+ attrs := []any{
+ slog.Int64("batchesRead", p.batchesRead),
+ slog.Int64("recordsRead", p.recordsRead),
+ slog.Int64("approxBytesRead", p.bytesEstimate),
+ slog.Duration("elapsed", time.Since(p.start)),
+ }
+ if !p.firstBatchAt.IsZero() {
+ attrs = append(attrs, slog.Duration("timeToFirstBatch",
p.firstBatchAt.Sub(p.start)))
+ } else {
+ attrs = append(attrs, slog.String("timeToFirstBatch", "never"))
+ }
+ if !p.lastBatchAt.IsZero() {
+ attrs = append(attrs, slog.Duration("timeSinceLastBatch",
time.Since(p.lastBatchAt)))
+ }
+ return attrs
+}
+
+// summary returns a compact human-readable summary of the stream's progress
+// suitable for embedding into wrapped error messages.
+func (p *streamProgress) summary() string {
+ if p.batchesRead == 0 {
+ return "no batches received before failure; elapsed=" +
time.Since(p.start).String()
+ }
+ return "received " + formatInt(p.batchesRead) + " batch(es), " +
+ formatInt(p.recordsRead) + " row(s) before failure; elapsed=" +
time.Since(p.start).String() +
+ "; timeSinceLastBatch=" + time.Since(p.lastBatchAt).String()
+}
+
+// formatInt formats an int64 without pulling in fmt.
+func formatInt(n int64) string {
+ return strconv.FormatInt(n, 10)
+}
+
func makeUnaryLoggingInterceptor(logger *slog.Logger)
grpc.UnaryClientInterceptor {
interceptor := func(ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
@@ -36,11 +150,18 @@ func makeUnaryLoggingInterceptor(logger *slog.Logger)
grpc.UnaryClientIntercepto
outgoing, _ := metadata.FromOutgoingContext(ctx)
err := invoker(ctx, method, req, reply, cc, opts...)
if logger.Enabled(ctx, slog.LevelDebug) {
- logger.DebugContext(ctx, method, "target", cc.Target(),
"duration", time.Since(start), "err", err, "metadata", outgoing)
+ args := []any{"target", cc.Target(), "duration",
time.Since(start), "err", err, "metadata", outgoing}
+ args = append(args, outgoingCallHeaderAttrs(ctx)...)
+ args = append(args, grpcStatusAttrs(err)...)
+ logger.DebugContext(ctx, method, args...)
} else {
keys := maps.Keys(outgoing)
slices.Sort(keys)
- logger.InfoContext(ctx, method, "target", cc.Target(),
"duration", time.Since(start), "err", err, "metadata", keys)
+ args := []any{"target", cc.Target(), "duration",
time.Since(start), "err", err, "metadata", keys}
+ // Surface curated outbound correlation IDs regardless
of level.
+ args = append(args, outgoingCallHeaderAttrs(ctx)...)
+ args = append(args, grpcStatusAttrs(err)...)
+ logger.InfoContext(ctx, method, args...)
}
return err
}
@@ -54,7 +175,10 @@ func makeStreamLoggingInterceptor(logger *slog.Logger)
grpc.StreamClientIntercep
outgoing, _ := metadata.FromOutgoingContext(ctx)
stream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
- logger.InfoContext(ctx, method, "target", cc.Target(),
"duration", time.Since(start), "err", err)
+ args := []any{"target", cc.Target(), "duration",
time.Since(start), "err", err}
+ args = append(args, outgoingCallHeaderAttrs(ctx)...)
+ args = append(args, grpcStatusAttrs(err)...)
+ logger.InfoContext(ctx, method, args...)
return stream, err
}
@@ -72,23 +196,277 @@ type loggedStream struct {
start time.Time
target string
outgoing metadata.MD
+
+ // recvCount tracks how many messages were received before the stream
+ // ended; logged on termination so EOFs on empty streams are
distinguishable
+ // from mid-stream failures.
+ recvCount int64
}
func (stream *loggedStream) RecvMsg(m any) error {
err := stream.ClientStream.RecvMsg(m)
- if err != nil {
- loggedErr := err
- if loggedErr == io.EOF {
- loggedErr = nil
- }
+ if err == nil {
+ stream.recvCount++
+ return nil
+ }
- if stream.logger.Enabled(stream.ctx, slog.LevelDebug) {
- stream.logger.DebugContext(stream.ctx, stream.method,
"target", stream.target, "duration", time.Since(stream.start), "err",
loggedErr, "metadata", stream.outgoing)
- } else {
- keys := maps.Keys(stream.outgoing)
- slices.Sort(keys)
- stream.logger.InfoContext(stream.ctx, stream.method,
"target", stream.target, "duration", time.Since(stream.start), "err",
loggedErr, "metadata", keys)
+ loggedErr := err
+ if loggedErr == io.EOF {
+ loggedErr = nil
+ }
+
+ // Capture trailers from the terminated stream; they often carry
+ // server-side diagnostic information for failure triage.
+ trailer := stream.Trailer()
+
+ if stream.logger.Enabled(stream.ctx, slog.LevelDebug) {
+ stream.logger.DebugContext(stream.ctx, stream.method,
+ "target", stream.target,
+ "duration", time.Since(stream.start),
+ "err", loggedErr,
+ "recvMessages", stream.recvCount,
+ "metadata", stream.outgoing,
+ "trailer", trailer,
+ )
+ } else {
+ keys := maps.Keys(stream.outgoing)
+ slices.Sort(keys)
+ trailerKeys := maps.Keys(trailer)
+ slices.Sort(trailerKeys)
+ args := []any{
+ "target", stream.target,
+ "duration", time.Since(stream.start),
+ "err", loggedErr,
+ "recvMessages", stream.recvCount,
+ "metadata", keys,
+ "trailer", trailerKeys,
+ }
+ // Promote curated correlation headers from the trailer.
+ args = append(args, correlationHeaderAttrs(trailer)...)
+ // Promote the outbound correlation IDs the caller supplied.
+ args = append(args, outgoingCallHeaderAttrs(stream.ctx)...)
+ // EOF is a clean close in Flight, so loggedErr was nil-ed
above;
+ // only attach status attrs for real errors.
+ if loggedErr != nil {
+ args = append(args, grpcStatusAttrs(loggedErr)...)
}
+ stream.logger.InfoContext(stream.ctx, stream.method, args...)
}
return err
}
+
+// wellKnownCorrelationHeaders is the curated allow-list of inbound gRPC
+// header/trailer keys that are surfaced verbatim into log records, for
+// cross-referencing client-side logs with server-side traces. Includes
+// the Microsoft / Power BI / Power Query family of correlation IDs.
+var wellKnownCorrelationHeaders = []string{
+ "x-request-id",
+ "x-correlation-id",
+ "x-trace-id",
+ "x-amzn-trace-id",
+ "x-b3-traceid",
+ "x-b3-spanid",
+ "traceparent",
+ "tracestate",
+ "x-arrow-flight-session-id",
+ "x-dremio-request-id",
+ "x-dremio-query-id",
+ "x-server-version",
+ "server",
+ // Microsoft / Power BI / Power Query family. gRPC's metadata package
+ // normalizes header names to lower case; both unprefixed and "x-ms-"
+ // variants are listed because Mashup's diagnostics record the former.
+ "activityid",
+ "activity-id",
+ "x-ms-activity-id",
+ "x-ms-client-request-id",
+ "x-ms-request-id",
+ "requestid",
+ "x-pbi-activity-id",
+}
+
+// headerAttrsWithPrefix is the shared implementation behind
+// correlationHeaderAttrs (incoming) and outgoingCallHeaderAttrs
+// (outbound). Only headers in wellKnownCorrelationHeaders are emitted;
+// returns nil when none are present.
+func headerAttrsWithPrefix(md metadata.MD, prefix string) []any {
+ if len(md) == 0 {
+ return nil
+ }
+ out := make([]any, 0, 4)
+ for _, k := range wellKnownCorrelationHeaders {
+ if vals := md.Get(k); len(vals) > 0 {
+ out = append(out, slog.Any(prefix+k, vals))
+ }
+ }
+ return out
+}
+
+// correlationHeaderAttrs returns slog attributes for well-known correlation
+// headers present in md (typically incoming headers/trailers). Uses the
+// "hdr_" prefix; only allow-listed headers are emitted.
+func correlationHeaderAttrs(md metadata.MD) []any {
+ return headerAttrsWithPrefix(md, "hdr_")
+}
+
+// outgoingCallHeaderAttrs returns slog attributes for well-known correlation
+// headers on ctx's outbound gRPC metadata. Uses the "out_hdr_" prefix.
+func outgoingCallHeaderAttrs(ctx context.Context) []any {
+ if ctx == nil {
+ return nil
+ }
+ md, ok := metadata.FromOutgoingContext(ctx)
+ if !ok {
+ return nil
+ }
+ return headerAttrsWithPrefix(md, "out_hdr_")
+}
+
+// grpcStatusAttrs returns "grpc_code" and "grpc_message" slog attributes
+// for the gRPC status embedded in err, or nil if err has no status.
+func grpcStatusAttrs(err error) []any {
+ if err == nil {
+ return nil
+ }
+ st, ok := status.FromError(err)
+ if !ok {
+ return nil
+ }
+ return []any{
+ slog.String("grpc_code", st.Code().String()),
+ slog.String("grpc_message", st.Message()),
+ }
+}
+
+// otelTraceHandler wraps an slog.Handler so records are stamped with the
+// current OpenTelemetry "trace_id" and "span_id" when the record's context
+// carries an active span.
+type otelTraceHandler struct {
+ inner slog.Handler
+}
+
+func (h *otelTraceHandler) Enabled(ctx context.Context, level slog.Level) bool
{
+ return h.inner.Enabled(ctx, level)
+}
+
+func (h *otelTraceHandler) Handle(ctx context.Context, r slog.Record) error {
+ if ctx != nil {
+ sc := trace.SpanFromContext(ctx).SpanContext()
+ if sc.IsValid() {
+ r.AddAttrs(
+ slog.String("trace_id", sc.TraceID().String()),
+ slog.String("span_id", sc.SpanID().String()),
+ )
+ }
+ }
+ return h.inner.Handle(ctx, r)
+}
+
+func (h *otelTraceHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
+ return &otelTraceHandler{inner: h.inner.WithAttrs(attrs)}
+}
+
+func (h *otelTraceHandler) WithGroup(name string) slog.Handler {
+ return &otelTraceHandler{inner: h.inner.WithGroup(name)}
+}
+
+// withOtelTraceContext wraps logger so records carry "trace_id" and
+// "span_id" attributes from any OpenTelemetry span on the record's
+// context. Idempotent; a nil logger is returned unchanged.
+func withOtelTraceContext(logger *slog.Logger) *slog.Logger {
+ if logger == nil {
+ return logger
+ }
+ if _, alreadyWrapped := logger.Handler().(*otelTraceHandler);
alreadyWrapped {
+ return logger
+ }
+ return slog.New(&otelTraceHandler{inner: logger.Handler()})
+}
+
+// newRandomID returns a short "<prefix>-<hex>" identifier for tagging log
+// records and error details. Falls back to a nanosecond timestamp if
+// crypto/rand is unavailable.
+func newRandomID(prefix string) string {
+ var b [6]byte
+ if _, err := rand.Read(b[:]); err != nil {
+ return prefix + "-" + strconv.FormatInt(time.Now().UnixNano(),
16)
+ }
+ return prefix + "-" + hex.EncodeToString(b[:])
+}
+
+// queryFingerprintAttrs builds slog attributes identifying a SQL query
+// without exposing it: length and a SHA-256 prefix. The query text itself
+// is never logged because it can embed end-user PII as literals.
+func queryFingerprintAttrs(query string) []any {
+ if query == "" {
+ return []any{slog.String("query_type", "empty")}
+ }
+ h := sha256.Sum256([]byte(query))
+ return []any{
+ slog.String("query_type", "sql"),
+ slog.Int("query_length", len(query)),
+ slog.String("query_sha256_prefix", hex.EncodeToString(h[:8])),
+ }
+}
+
+// substraitFingerprintAttrs builds slog attributes identifying a Substrait
+// plan: length, SHA-256 prefix, and protocol version. Plan bytes are never
+// logged.
+func substraitFingerprintAttrs(plan []byte, version string) []any {
+ if len(plan) == 0 {
+ return []any{slog.String("query_type", "substrait_empty")}
+ }
+ h := sha256.Sum256(plan)
+ attrs := []any{
+ slog.String("query_type", "substrait"),
+ slog.Int("substrait_plan_bytes", len(plan)),
+ slog.String("substrait_plan_sha256_prefix",
hex.EncodeToString(h[:8])),
+ }
+ if version != "" {
+ attrs = append(attrs, slog.String("substrait_version", version))
+ }
+ return attrs
+}
+
+// flightInfoLogAttrs returns slog attributes describing a FlightInfo:
+// descriptor type and command prefix, AppMetadata prefix (some backends
+// embed a server-side query handle there), and advisory record/byte
+// counts. Returns nil for a nil info.
+func flightInfoLogAttrs(info *flight.FlightInfo) []any {
+ if info == nil {
+ return nil
+ }
+ attrs := []any{
+ slog.Int("numEndpoints", len(info.Endpoint)),
+ slog.Int64("totalRecords", info.TotalRecords),
+ slog.Int64("totalBytes", info.TotalBytes),
+ slog.Bool("haveSchemaInFlightInfo", len(info.Schema) > 0),
+ }
+ if desc := info.FlightDescriptor; desc != nil {
+ attrs = append(attrs, slog.String("descriptorType",
desc.Type.String()))
+ if len(desc.Cmd) > 0 {
+ limit := len(desc.Cmd)
+ if limit > maxLoggedBlobBytes {
+ limit = maxLoggedBlobBytes
+ }
+ attrs = append(attrs,
+ slog.Int("descriptorCmdBytes", len(desc.Cmd)),
+ slog.String("descriptorCmdPrefixHex",
hex.EncodeToString(desc.Cmd[:limit])),
+ )
+ }
+ if len(desc.Path) > 0 {
+ attrs = append(attrs, slog.Any("descriptorPath",
desc.Path))
+ }
+ }
+ if len(info.AppMetadata) > 0 {
+ limit := len(info.AppMetadata)
+ if limit > maxLoggedBlobBytes {
+ limit = maxLoggedBlobBytes
+ }
+ attrs = append(attrs,
+ slog.Int("appMetadataBytes", len(info.AppMetadata)),
+ slog.String("appMetadataPrefixHex",
hex.EncodeToString(info.AppMetadata[:limit])),
+ )
+ }
+ return attrs
+}
diff --git a/go/adbc/driver/flightsql/logging_test.go
b/go/adbc/driver/flightsql/logging_test.go
new file mode 100644
index 000000000..4673a8c2c
--- /dev/null
+++ b/go/adbc/driver/flightsql/logging_test.go
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flightsql
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "log/slog"
+ "testing"
+
+ "go.opentelemetry.io/otel/trace"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+// TestHeaderAttrsWithPrefix_AllowAndDeny exercises the curated allow-list.
+// The function is the engine behind both correlationHeaderAttrs ("hdr_"
+// prefix, used on received headers / trailers) and outgoingCallHeaderAttrs
+// ("out_hdr_" prefix, used on call-time outgoing metadata). Only headers
+// in wellKnownCorrelationHeaders are emitted; everything else is dropped.
+func TestHeaderAttrsWithPrefix_AllowAndDeny(t *testing.T) {
+ md := metadata.New(map[string]string{
+ // Allow-listed exact match.
+ "x-request-id": "req-1",
+ // Microsoft / PBI allow-listed exact matches.
+ "activityid": "act-1",
+ "x-pbi-activity-id": "pbi-act-1",
+ // Not on the allow-list (no suffix-match fallback).
+ "x-vendor-request-id": "vreq-1",
+ // Credential header.
+ "authorization": "Bearer SECRET",
+ // Random header.
+ "x-random-header": "noise",
+ })
+
+ got := headerAttrsWithPrefix(md, "hdr_")
+
+ // Convert []any of alternating key/value pairs into a map for
+ // stable assertions. Each slog.Any takes the form (string, slice).
+ gotMap := slogAttrsToMap(t, got)
+
+ wantPresent := []string{
+ "hdr_x-request-id",
+ "hdr_activityid",
+ "hdr_x-pbi-activity-id",
+ }
+ for _, k := range wantPresent {
+ if _, ok := gotMap[k]; !ok {
+ t.Errorf("expected attribute %q in
headerAttrsWithPrefix result, got keys=%v",
+ k, sortedKeys(gotMap))
+ }
+ }
+
+ wantAbsent := []string{
+ "hdr_x-vendor-request-id",
+ "hdr_authorization",
+ "hdr_x-random-header",
+ }
+ for _, k := range wantAbsent {
+ if _, ok := gotMap[k]; ok {
+ t.Errorf("unexpected attribute %q in
headerAttrsWithPrefix result "+
+ "(must be filtered)", k)
+ }
+ }
+}
+
+// TestHeaderAttrsWithPrefix_EmptyMetadata verifies the function returns
+// nil (not an empty slice) when there is nothing to log, so callers can
+// safely use append(...) without producing an empty placeholder entry.
+func TestHeaderAttrsWithPrefix_EmptyMetadata(t *testing.T) {
+ if got := headerAttrsWithPrefix(nil, "hdr_"); got != nil {
+ t.Fatalf("headerAttrsWithPrefix(nil, _) = %v, want nil", got)
+ }
+ if got := headerAttrsWithPrefix(metadata.MD{}, "hdr_"); got != nil {
+ t.Fatalf("headerAttrsWithPrefix(empty, _) = %v, want nil", got)
+ }
+}
+
+// TestCorrelationVsOutgoingPrefix asserts the two public wrappers use
+// distinct prefixes so received and sent headers never collide in a
+// single log line.
+func TestCorrelationVsOutgoingPrefix(t *testing.T) {
+ md := metadata.New(map[string]string{
+ "activityid": "act-99",
+ })
+
+ in := slogAttrsToMap(t, correlationHeaderAttrs(md))
+ if _, ok := in["hdr_activityid"]; !ok {
+ t.Errorf("correlationHeaderAttrs did not emit hdr_activityid;
got %v",
+ sortedKeys(in))
+ }
+
+ ctx := metadata.NewOutgoingContext(context.Background(), md)
+ out := slogAttrsToMap(t, outgoingCallHeaderAttrs(ctx))
+ if _, ok := out["out_hdr_activityid"]; !ok {
+ t.Errorf("outgoingCallHeaderAttrs did not emit
out_hdr_activityid; got %v",
+ sortedKeys(out))
+ }
+}
+
+// TestOutgoingCallHeaderAttrs_NilOrMissingContext covers the safety paths
+// (nil context, context without outbound metadata) so that the call
+// sites in the unary/stream interceptors do not need their own
+// nil-guards.
+func TestOutgoingCallHeaderAttrs_NilOrMissingContext(t *testing.T) {
+ // Use a typed nil context.Context variable rather than the untyped
+ // `nil` literal: staticcheck SA1012 flags passing the literal `nil`
+ // to a context.Context parameter (and offers two conflicting
+ // auto-fixes that collide in the linter). The function under test
+ // *does* have a `ctx == nil` guard that we want to exercise, and a
+ // typed nil interface value still compares equal to nil, so this
+ // covers the same branch without tripping the lint rule.
+ var nilCtx context.Context
+ if got := outgoingCallHeaderAttrs(nilCtx); got != nil {
+ t.Fatalf("outgoingCallHeaderAttrs(nil) = %v, want nil", got)
+ }
+ if got := outgoingCallHeaderAttrs(context.Background()); got != nil {
+ t.Fatalf("outgoingCallHeaderAttrs(context.Background()) = %v,
want nil "+
+ "(no outbound metadata set)", got)
+ }
+}
+
+// TestGrpcStatusAttrs covers the helper that promotes a gRPC status to
+// its own structured "grpc_code"/"grpc_message" log fields. The helper
+// must handle nil errors, plain Go errors, real gRPC status errors,
+// and gRPC errors that have been wrapped via fmt.Errorf("%w", ...).
+func TestGrpcStatusAttrs(t *testing.T) {
+ t.Run("nil_error", func(t *testing.T) {
+ if got := grpcStatusAttrs(nil); got != nil {
+ t.Fatalf("grpcStatusAttrs(nil) = %v, want nil", got)
+ }
+ })
+
+ t.Run("plain_error", func(t *testing.T) {
+ // errors.New does not carry a GRPCStatus()/Unwrap chain, so
+ // status.FromError returns ok=false and the helper returns
+ // nil rather than synthesizing a fake code.
+ if got := grpcStatusAttrs(errors.New("boom")); got != nil {
+ t.Fatalf("grpcStatusAttrs(errors.New) = %v, want nil",
got)
+ }
+ })
+
+ t.Run("grpc_status_error", func(t *testing.T) {
+ err := status.Error(codes.Unavailable, "DoGet: endpoint 0")
+ got := slogAttrsToMap(t, grpcStatusAttrs(err))
+ if v := got["grpc_code"]; v != "Unavailable" {
+ t.Errorf("grpc_code = %q, want %q", v, "Unavailable")
+ }
+ if v := got["grpc_message"]; v != "DoGet: endpoint 0" {
+ t.Errorf("grpc_message = %q, want %q", v, "DoGet:
endpoint 0")
+ }
+ })
+
+ t.Run("wrapped_grpc_status_error", func(t *testing.T) {
+ inner := status.Error(codes.DeadlineExceeded, "timeout")
+ wrapped := fmt.Errorf("outer: %w", inner)
+ got := slogAttrsToMap(t, grpcStatusAttrs(wrapped))
+ if v := got["grpc_code"]; v != "DeadlineExceeded" {
+ t.Errorf("grpc_code = %q, want %q", v,
"DeadlineExceeded")
+ }
+ })
+}
+
+// TestOtelTraceHandler_InjectsTraceIDs creates an slog handler chain
+// "JSON -> otelTraceHandler -> buffer", emits a record with a context
+// carrying a known SpanContext, and verifies that the handler stamped
+// "trace_id" and "span_id" attributes onto the resulting record. This
+// is the bridge between the driver's slog stream and any external
+// OpenTelemetry traces the host application is producing.
+func TestOtelTraceHandler_InjectsTraceIDs(t *testing.T) {
+ var buf bytes.Buffer
+ base := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level:
slog.LevelDebug})
+ logger := slog.New(&otelTraceHandler{inner: base})
+
+ tidHex := "4bf92f3577b34da6a3ce929d0e0e4736"
+ sidHex := "00f067aa0ba902b7"
+ tid, err := trace.TraceIDFromHex(tidHex)
+ if err != nil {
+ t.Fatalf("TraceIDFromHex: %v", err)
+ }
+ sid, err := trace.SpanIDFromHex(sidHex)
+ if err != nil {
+ t.Fatalf("SpanIDFromHex: %v", err)
+ }
+ sc := trace.NewSpanContext(trace.SpanContextConfig{
+ TraceID: tid,
+ SpanID: sid,
+ TraceFlags: trace.FlagsSampled,
+ Remote: true,
+ })
+ ctx := trace.ContextWithSpanContext(context.Background(), sc)
+
+ logger.InfoContext(ctx, "test event")
+
+ rec := decodeFirstLogLine(t, buf.Bytes())
+ if got := rec["trace_id"]; got != tidHex {
+ t.Errorf("trace_id = %q, want %q (full record: %v)", got,
tidHex, rec)
+ }
+ if got := rec["span_id"]; got != sidHex {
+ t.Errorf("span_id = %q, want %q (full record: %v)", got,
sidHex, rec)
+ }
+}
+
+// TestOtelTraceHandler_NoSpanLeavesRecordUnchanged ensures the handler
+// is a no-op when the context does not carry a valid SpanContext. We
+// must not invent placeholder trace/span IDs just to fill the slot —
+// otherwise downstream log search would match unrelated records.
+func TestOtelTraceHandler_NoSpanLeavesRecordUnchanged(t *testing.T) {
+ var buf bytes.Buffer
+ base := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level:
slog.LevelDebug})
+ logger := slog.New(&otelTraceHandler{inner: base})
+
+ logger.InfoContext(context.Background(), "no span")
+
+ rec := decodeFirstLogLine(t, buf.Bytes())
+ if _, ok := rec["trace_id"]; ok {
+ t.Errorf("unexpected trace_id in record without active span:
%v", rec)
+ }
+ if _, ok := rec["span_id"]; ok {
+ t.Errorf("unexpected span_id in record without active span:
%v", rec)
+ }
+}
+
+// TestWithOtelTraceContext_Idempotent verifies the wrap helper does not
+// stack handlers on repeated calls. Without this guard every
+// derivation step (NewConnection, NewStatement, ...) would add another
+// wrapper, slowing logging and bloating handler chains over time.
+func TestWithOtelTraceContext_Idempotent(t *testing.T) {
+ if got := withOtelTraceContext(nil); got != nil {
+ t.Fatalf("withOtelTraceContext(nil) = %v, want nil", got)
+ }
+
+ base := slog.New(slog.NewJSONHandler(&bytes.Buffer{}, nil))
+ wrapped1 := withOtelTraceContext(base)
+ wrapped2 := withOtelTraceContext(wrapped1)
+ if _, ok := wrapped2.Handler().(*otelTraceHandler); !ok {
+ t.Fatalf("expected outer handler to be *otelTraceHandler after
double-wrap")
+ }
+ // Drill one level into the inner handler — it must not itself be
+ // another *otelTraceHandler (which would mean the helper stacked
+ // instead of de-duplicating).
+ outer := wrapped2.Handler().(*otelTraceHandler)
+ if _, doubled := outer.inner.(*otelTraceHandler); doubled {
+ t.Fatalf("withOtelTraceContext stacked handlers on repeated
calls")
+ }
+}
+
+// TestSafeLogger_AlwaysWrapsOtel guarantees that every logger going
+// through the central safe wrapper carries the OTEL trace bridge, so
+// individual callers do not have to remember to add it themselves.
+func TestSafeLogger_AlwaysWrapsOtel(t *testing.T) {
+ t.Run("nil_input", func(t *testing.T) {
+ l := safeLogger(nil)
+ if l == nil {
+ t.Fatal("safeLogger(nil) returned nil")
+ }
+ if _, ok := l.Handler().(*otelTraceHandler); !ok {
+ t.Errorf("safeLogger(nil) did not wrap with
otelTraceHandler; got %T",
+ l.Handler())
+ }
+ })
+ t.Run("real_input", func(t *testing.T) {
+ base := slog.New(slog.NewJSONHandler(&bytes.Buffer{}, nil))
+ l := safeLogger(base)
+ if _, ok := l.Handler().(*otelTraceHandler); !ok {
+ t.Errorf("safeLogger(base) did not wrap with
otelTraceHandler; got %T",
+ l.Handler())
+ }
+ })
+}
+
+// ---------- test helpers ----------
+
+// slogAttrsToMap converts the slog.Attr slice returned by the various
+// "...Attrs" helpers into a map[string]string keyed by attribute name.
+// Each element of the input is expected to be a single slog.Attr value
+// (which is what slog.Any / slog.String produce). The map's values are
+// taken from the slog.Value's String() representation so callers can do
+// straightforward equality assertions without unwrapping Value kinds.
+func slogAttrsToMap(t *testing.T, attrs []any) map[string]string {
+ t.Helper()
+ out := make(map[string]string, len(attrs))
+ for i, a := range attrs {
+ attr, ok := a.(slog.Attr)
+ if !ok {
+ t.Fatalf("attrs[%d] is %T, want slog.Attr (value=%v)",
i, a, a)
+ }
+ out[attr.Key] = attr.Value.String()
+ }
+ return out
+}
+
+func sortedKeys(m map[string]string) []string {
+ out := make([]string, 0, len(m))
+ for k := range m {
+ out = append(out, k)
+ }
+ return out
+}
+
+func decodeFirstLogLine(t *testing.T, b []byte) map[string]any {
+ t.Helper()
+ line := bytes.TrimSpace(b)
+ if i := bytes.IndexByte(line, '\n'); i >= 0 {
+ line = line[:i]
+ }
+ if len(line) == 0 {
+ t.Fatalf("no log lines captured")
+ }
+ rec := map[string]any{}
+ if err := json.Unmarshal(line, &rec); err != nil {
+ t.Fatalf("failed to decode log line %q: %v", line, err)
+ }
+ return rec
+}
diff --git a/go/adbc/driver/flightsql/record_reader.go
b/go/adbc/driver/flightsql/record_reader.go
index a0c2cc481..071cd1880 100644
--- a/go/adbc/driver/flightsql/record_reader.go
+++ b/go/adbc/driver/flightsql/record_reader.go
@@ -20,6 +20,7 @@ package flightsql
import (
"context"
"fmt"
+ "log/slog"
"sync/atomic"
"github.com/apache/arrow-adbc/go/adbc"
@@ -29,6 +30,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/flight"
"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
"github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/arrow/util"
"github.com/bluele/gcache"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@@ -46,9 +48,23 @@ type reader struct {
cancelFn context.CancelFunc
}
-// kicks off a goroutine for each endpoint and returns a reader which
-// gathers all of the records as they come in.
-func newRecordReader(ctx context.Context, alloc memory.Allocator, cl
*flightsql.Client, info *flight.FlightInfo, clCache gcache.Cache, bufferSize
int, opts ...grpc.CallOption) (rdr array.RecordReader, err error) {
+// recordReaderConfig bundles the dependencies that newRecordReader
+// needs to spin up its per-endpoint goroutines.
+type recordReaderConfig struct {
+ alloc memory.Allocator
+ cl *flightsql.Client
+ info *flight.FlightInfo
+ clientCache gcache.Cache
+ bufferSize int
+ logger *slog.Logger
+}
+
+// newRecordReader kicks off a goroutine for each endpoint and returns a
+// reader which gathers all of the records as they come in. cfg.logger
+// may be nil.
+func newRecordReader(ctx context.Context, cfg recordReaderConfig, opts
...grpc.CallOption) (rdr array.RecordReader, err error) {
+ log := safeLogger(cfg.logger)
+ info := cfg.info
endpoints := info.Endpoint
var header, trailer metadata.MD
opts = append(append([]grpc.CallOption{}, opts...),
grpc.Header(&header), grpc.Trailer(&trailer))
@@ -60,7 +76,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
Code: adbc.StatusInternal,
}
}
- schema, err = flight.DeserializeSchema(info.Schema, alloc)
+ schema, err = flight.DeserializeSchema(info.Schema, cfg.alloc)
if err != nil {
return nil, adbc.Error{
Msg: "Server returned FlightInfo with invalid
schema and no endpoints, cannot read stream",
@@ -70,12 +86,18 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
return array.NewRecordReader(schema, []arrow.RecordBatch{})
}
- ch := make(chan arrow.RecordBatch, bufferSize)
+ ch := make(chan arrow.RecordBatch, cfg.bufferSize)
group, ctx := errgroup.WithContext(ctx)
ctx, cancelFn := context.WithCancel(ctx)
// We may mutate endpoints below
numEndpoints := len(endpoints)
+ log.DebugContext(ctx, "FlightSQL newRecordReader start",
+ append([]any{
+ slog.Int("bufferSize", cfg.bufferSize),
+ }, flightInfoLogAttrs(info)...)...,
+ )
+
defer func() {
if err != nil {
close(ch)
@@ -84,7 +106,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
}()
if info.Schema != nil {
- schema, err = flight.DeserializeSchema(info.Schema, alloc)
+ schema, err = flight.DeserializeSchema(info.Schema, cfg.alloc)
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
@@ -92,9 +114,19 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
}
} else {
firstEndpoint := endpoints[0]
- rdr, err := doGet(ctx, cl, firstEndpoint, clCache, opts...)
+ epAttrs := endpointLogAttrs(0, numEndpoints, firstEndpoint)
+ log.DebugContext(ctx, "FlightSQL endpoint stream opening
(schema discovery)", epAttrs...)
+ startSchemaFetch := newStreamProgress()
+ rdr, err := doGetWithLogger(ctx, cfg.cl, firstEndpoint,
cfg.clientCache, log, opts...)
if err != nil {
- return nil, adbcFromFlightStatusWithDetails(err,
header, trailer, "DoGet: endpoint 0: remote: %s", firstEndpoint.Location)
+ log.ErrorContext(ctx, "FlightSQL endpoint DoGet failed
(schema discovery)",
+ append(append([]any{}, epAttrs...),
+ "err", err,
+ "elapsed", startSchemaFetch.summary(),
+ )...,
+ )
+ return nil, adbcFromFlightStatusWithDetails(err,
header, trailer,
+ "DoGet: endpoint 0: remote: %s",
firstEndpoint.Location)
}
schema = rdr.Schema()
group.Go(func() error {
@@ -103,14 +135,27 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
defer close(ch)
}
+ progress := newStreamProgress()
for rdr.Next() && ctx.Err() == nil {
rec := rdr.RecordBatch()
+ progress.recordBatch(rec.NumRows(),
util.TotalRecordSize(rec))
rec.Retain()
ch <- rec
}
if err := checkContext(rdr.Err(), ctx); err != nil {
- return adbcFromFlightStatusWithDetails(err,
header, trailer, "DoGet: endpoint 0: remote: %s", firstEndpoint.Location)
+ log.ErrorContext(ctx, "FlightSQL endpoint
stream ended with error",
+ append(append([]any{},
endpointLogAttrs(0, numEndpoints, firstEndpoint)...),
+ append([]any{"err", err},
progress.logAttrs()...)...,
+ )...,
+ )
+ return adbcFromFlightStatusWithDetails(err,
header, trailer,
+ "DoGet: endpoint 0: remote: %s",
firstEndpoint.Location)
}
+ log.DebugContext(ctx, "FlightSQL endpoint stream
completed",
+ append(append([]any{}, endpointLogAttrs(0,
numEndpoints, firstEndpoint)...),
+ progress.logAttrs()...,
+ )...,
+ )
return nil
})
@@ -133,39 +178,85 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
for i, ep := range endpoints {
endpoint := ep
endpointIndex := i
- chs[endpointIndex] = make(chan arrow.RecordBatch, bufferSize)
+ // Offset the endpoint index for the log records to account for
endpoint 0
+ // having been processed above when info.Schema was unset.
+ logEndpointIndex := endpointIndex
+ if info.Schema == nil {
+ logEndpointIndex = endpointIndex + 1
+ }
+ chs[endpointIndex] = make(chan arrow.RecordBatch,
cfg.bufferSize)
group.Go(func() error {
// Close channels (except the last) so that Next can
move on to the next channel properly
if endpointIndex != lastChannelIndex {
defer close(chs[endpointIndex])
}
- rdr, err := doGet(ctx, cl, endpoint, clCache, opts...)
+ epAttrs := endpointLogAttrs(logEndpointIndex,
numEndpoints, endpoint)
+ log.DebugContext(ctx, "FlightSQL endpoint stream
opening", epAttrs...)
+ doGetStart := newStreamProgress()
+ rdr, err := doGetWithLogger(ctx, cfg.cl, endpoint,
cfg.clientCache, log, opts...)
if err != nil {
- return adbcFromFlightStatusWithDetails(err,
header, trailer, "DoGet: endpoint %d: %s", endpointIndex, endpoint.Location)
+ log.ErrorContext(ctx, "FlightSQL endpoint DoGet
failed",
+ append(append([]any{}, epAttrs...),
+ "err", err,
+ "elapsed", doGetStart.summary(),
+ )...,
+ )
+ return adbcFromFlightStatusWithDetails(err,
header, trailer,
+ "DoGet: endpoint %d: %s",
logEndpointIndex, endpoint.Location)
}
defer rdr.Release()
streamSchema := utils.RemoveSchemaMetadata(rdr.Schema())
if !streamSchema.Equal(referenceSchema) {
- return fmt.Errorf("endpoint %d returned
inconsistent schema: expected %s but got %s", endpointIndex,
referenceSchema.String(), streamSchema.String())
+ log.ErrorContext(ctx, "FlightSQL endpoint
returned inconsistent schema",
+ append(append([]any{}, epAttrs...),
+ "expectedSchema",
referenceSchema.String(),
+ "actualSchema",
streamSchema.String(),
+ )...,
+ )
+ return fmt.Errorf("endpoint %d returned
inconsistent schema: expected %s but got %s", logEndpointIndex,
referenceSchema.String(), streamSchema.String())
}
+ progress := newStreamProgress()
for rdr.Next() && ctx.Err() == nil {
rec := rdr.RecordBatch()
+ progress.recordBatch(rec.NumRows(),
util.TotalRecordSize(rec))
rec.Retain()
chs[endpointIndex] <- rec
}
if err := checkContext(rdr.Err(), ctx); err != nil {
- return adbcFromFlightStatusWithDetails(err,
header, trailer, "DoGet: endpoint %d: %s", endpointIndex, endpoint.Location)
+ log.ErrorContext(ctx, "FlightSQL endpoint
stream ended with error",
+ append(append([]any{}, epAttrs...),
+ append([]any{"err", err},
progress.logAttrs()...)...,
+ )...,
+ )
+ return adbcFromFlightStatusWithDetails(err,
header, trailer,
+ "DoGet: endpoint %d: %s",
logEndpointIndex, endpoint.Location)
}
+ log.DebugContext(ctx, "FlightSQL endpoint stream
completed",
+ append(append([]any{}, epAttrs...),
+ progress.logAttrs()...,
+ )...,
+ )
return nil
})
}
go func() {
- reader.err = group.Wait()
+ err := group.Wait()
+ reader.err = err
+ if reader.err != nil {
+ log.WarnContext(ctx, "FlightSQL record reader finished
with error",
+ "err", reader.err,
+ "numEndpoints", numEndpoints,
+ )
+ } else {
+ log.DebugContext(ctx, "FlightSQL record reader finished
successfully",
+ "numEndpoints", numEndpoints,
+ )
+ }
// Don't close the last channel until after the group is
finished, so that
// Next() can only return after reader.err may have been set
close(chs[lastChannelIndex])
diff --git a/go/adbc/driver/flightsql/record_reader_test.go
b/go/adbc/driver/flightsql/record_reader_test.go
index ab11e0553..ab7b5f179 100644
--- a/go/adbc/driver/flightsql/record_reader_test.go
+++ b/go/adbc/driver/flightsql/record_reader_test.go
@@ -160,7 +160,13 @@ func (suite *RecordReaderTests)
TestFallbackFailedConnection() {
},
}
- reader, err := newRecordReader(context.Background(), suite.alloc,
suite.cl, &info, suite.clCache, 3)
+ reader, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.NoError(err)
defer reader.Release()
@@ -190,7 +196,13 @@ func (suite *RecordReaderTests) TestFallbackFailedDoGet() {
},
}
- reader, err := newRecordReader(context.Background(), suite.alloc,
suite.cl, &info, suite.clCache, 3)
+ reader, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.NoError(err)
defer reader.Release()
@@ -204,7 +216,13 @@ func (suite *RecordReaderTests) TestFallbackFailedDoGet() {
// Not enough retries
suite.service.failureCount = 4
- reader, err = newRecordReader(context.Background(), suite.alloc,
suite.cl, &info, suite.clCache, 3)
+ reader, err = newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.NoError(err)
defer reader.Release()
suite.False(reader.Next())
@@ -223,7 +241,13 @@ func (suite *RecordReaderTests) TestFallbackFailed() {
},
}
- reader, err := newRecordReader(context.Background(), suite.alloc,
suite.cl, &info, suite.clCache, 3)
+ reader, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.NoError(err)
defer reader.Release()
@@ -236,7 +260,13 @@ func (suite *RecordReaderTests) TestNoEndpoints() {
Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),
}
- reader, err := newRecordReader(context.Background(), suite.alloc,
suite.cl, &info, suite.clCache, 3)
+ reader, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.NoError(err)
defer reader.Release()
@@ -248,7 +278,13 @@ func (suite *RecordReaderTests) TestNoEndpoints() {
func (suite *RecordReaderTests) TestNoEndpointsNoSchema() {
info := flight.FlightInfo{}
- _, err := newRecordReader(context.Background(), suite.alloc, suite.cl,
&info, suite.clCache, 3)
+ _, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.ErrorContains(err, "Server returned FlightInfo with no schema and
no endpoints, cannot read stream")
}
@@ -257,7 +293,13 @@ func (suite *RecordReaderTests)
TestNoEndpointsInvalidSchema() {
Schema: []byte("f"),
}
- _, err := newRecordReader(context.Background(), suite.alloc, suite.cl,
&info, suite.clCache, 3)
+ _, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.ErrorContains(err, "Server returned FlightInfo with invalid
schema and no endpoints, cannot read stream")
}
@@ -272,7 +314,13 @@ func (suite *RecordReaderTests) TestNoSchema() {
},
}
- reader, err := newRecordReader(context.Background(), suite.alloc,
suite.cl, &info, suite.clCache, 3)
+ reader, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.NoError(err)
defer reader.Release()
@@ -305,7 +353,13 @@ func (suite *RecordReaderTests)
TestSchemaEndpointMismatch() {
},
}
- reader, err := newRecordReader(context.Background(), suite.alloc,
suite.cl, &info, suite.clCache, 3)
+ reader, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.NoError(err)
defer reader.Release()
@@ -339,7 +393,13 @@ func (suite *RecordReaderTests) TestOrdering() {
},
}
- reader, err := newRecordReader(context.Background(), suite.alloc,
suite.cl, &info, suite.clCache, 3)
+ reader, err := newRecordReader(context.Background(), recordReaderConfig{
+ alloc: suite.alloc,
+ cl: suite.cl,
+ info: &info,
+ clientCache: suite.clCache,
+ bufferSize: 3,
+ })
suite.NoError(err)
defer reader.Release()
diff --git a/go/adbc/driver/flightsql/utils.go
b/go/adbc/driver/flightsql/utils.go
index 4eeddd8eb..98dddadd9 100644
--- a/go/adbc/driver/flightsql/utils.go
+++ b/go/adbc/driver/flightsql/utils.go
@@ -135,13 +135,40 @@ func adbcFromFlightStatusWithDetails(err error, header,
trailer metadata.MD, con
return adbc.Error{
// People don't read error messages, so backload the context
and frontload the server error
- Msg: fmt.Sprintf("[FlightSQL] %s (%s; %s)",
grpcStatus.Message(), grpcStatus.Code(), fmt.Sprintf(context, args...)),
+ Msg: fmt.Sprintf("[FlightSQL] %s (%s; %s%s)",
grpcStatus.Message(), grpcStatus.Code(), fmt.Sprintf(context, args...),
eofHint(err, grpcStatus.Code())),
Code: adbcCode,
VendorCode: int32(grpcStatus.Code()),
Details: details,
}
}
+// eofHint returns a short diagnostic hint that is appended to the error
+// message produced by adbcFromFlightStatusWithDetails when the underlying
+// failure looks like a server-side stream termination ("error reading from
+// server: EOF" with a gRPC Unavailable code). The hint enumerates the most
+// common operator-actionable root causes so that the message left in logs
+// is self-describing without requiring source-code lookups. It returns an
+// empty string for any error that does not match the EOF pattern so we do
+// not pollute normal error messages.
+func eofHint(err error, code codes.Code) string {
+ if err == nil {
+ return ""
+ }
+ msg := err.Error()
+ if code != codes.Unavailable {
+ return ""
+ }
+ // Match on the substring rather than equality because the gRPC error
+ // chain wraps the underlying transport error in its own message format
+ // (e.g. "rpc error: code = Unavailable desc = error reading from
server: EOF").
+ if !strings.Contains(msg, "error reading from server: EOF") &&
!strings.Contains(msg, "transport is closing") {
+ return ""
+ }
+ return "; possible causes: server crashed or restarted, server-side
timeout/idle disconnect, " +
+ "load balancer or proxy idle timeout, network interruption,
server out-of-memory while serving the stream, " +
+ "or client read timeout shorter than server processing time"
+}
+
func checkContext(maybeErr error, ctx context.Context) error {
if maybeErr != nil && !errors.Is(maybeErr, io.EOF) {
return maybeErr
diff --git a/go/adbc/driver/internal/driverbase/database.go
b/go/adbc/driver/internal/driverbase/database.go
index 16a4edb80..5cff37a84 100644
--- a/go/adbc/driver/internal/driverbase/database.go
+++ b/go/adbc/driver/internal/driverbase/database.go
@@ -93,7 +93,6 @@ type Database interface {
adbc.Database
adbc.GetSetOptions
adbc.DatabaseLogging
- adbc.OTelTracingInit
}
// DatabaseImplBase is a struct that provides default implementations of the
@@ -110,11 +109,23 @@ type DatabaseImplBase struct {
traceParent string
}
-// NewDatabaseImplBase instantiates DatabaseImplBase.
+type TracingOptions struct {
+ // ExporterName overrides the OTEL_TRACES_EXPORTER environment
+ // variable. Must be one of "none", "otlp", "console", or "adbcfile".
+ ExporterName string
+
+ // TracingFolderPath overrides the default on-disk folder used by
+ // the "adbcfile" exporter. Ignored for other exporters.
+ TracingFolderPath string
+}
+
+// NewDatabaseImplBase instantiates DatabaseImplBase and initializes its
+// OpenTelemetry tracer using the supplied TracingOptions. Empty fields
+// fall back to the defaults documented on TracingOptions.
//
// - driver is a DriverImplBase containing the common resources from the
parent
// driver, allowing the Arrow allocator and error handler to be reused.
-func NewDatabaseImplBase(ctx context.Context, driver *DriverImplBase)
(DatabaseImplBase, error) {
+func NewDatabaseImplBase(ctx context.Context, driver *DriverImplBase, opts
TracingOptions) (DatabaseImplBase, error) {
database := DatabaseImplBase{
Alloc: driver.Alloc,
ErrorHelper: driver.ErrorHelper,
@@ -122,7 +133,12 @@ func NewDatabaseImplBase(ctx context.Context, driver
*DriverImplBase) (DatabaseI
Logger: nilLogger(),
Tracer: nilTracer(),
}
- err := database.InitTracing(ctx, driver.DriverInfo.GetName(),
getDriverVersion(driver.DriverInfo))
+ err := database.InitTracing(
+ ctx,
+ driver.DriverInfo.GetName(),
+ getDriverVersion(driver.DriverInfo),
+ opts,
+ )
return database, err
}
@@ -228,14 +244,22 @@ func (db *database) SetLogger(logger *slog.Logger) {
}
}
-func (base *database) InitTracing(ctx context.Context, driverName string,
driverVersion string) error {
- return base.Base().InitTracing(ctx, driverName, driverVersion)
-}
-
-func (base *DatabaseImplBase) InitTracing(ctx context.Context, driverName
string, driverVersion string) (err error) {
+// InitTracing initializes the database's OpenTelemetry tracer using the
+// supplied TracingOptions. Empty fields fall back to the defaults
+// documented on TracingOptions.
+func (base *DatabaseImplBase) InitTracing(
+ ctx context.Context,
+ driverName string,
+ driverVersion string,
+ opts TracingOptions,
+) (err error) {
fullyQualifiedDriverName := driverNamespace + "." + driverName
- exporterName := getExporterName()
+ // opts.ExporterName takes precedence over OTEL_TRACES_EXPORTER.
+ exporterName := opts.ExporterName
+ if exporterName == "" {
+ exporterName = getExporterName()
+ }
// Empty exporter
if exporterName == "" {
@@ -253,6 +277,7 @@ func (base *DatabaseImplBase) InitTracing(ctx
context.Context, driverName string
exporterName,
base,
driverName,
+ opts,
)
if err != nil {
return
@@ -280,6 +305,7 @@ func getExporters(
exporterName string,
base *DatabaseImplBase,
driverName string,
+ opts TracingOptions,
) (exporters []sdktrace.SpanExporter, exporterType traceExporterType, err
error) {
var exporter sdktrace.SpanExporter
exporterType, ok := tryParseTraceExporterType(exporterName)
@@ -307,7 +333,7 @@ func getExporters(
return
}
case TraceExporterAdbcFile:
- exporter, err = newAdbcFileExporter(driverName)
+ exporter, err = newAdbcFileExporter(driverName,
opts.TracingFolderPath)
if err != nil {
return
}
@@ -397,9 +423,13 @@ func newOtlpTraceExporters(ctx context.Context)
([]sdktrace.SpanExporter, error)
return []sdktrace.SpanExporter{grpcExporter, httpExporter}, nil
}
-func newAdbcFileExporter(driverName string) (*stdouttrace.Exporter, error) {
+func newAdbcFileExporter(driverName, folderPath string)
(*stdouttrace.Exporter, error) {
fullyQualifiedDriverName := strings.ToLower(driverNamespace + "." +
driverName)
- fileWriter, err :=
NewRotatingFileWriter(WithLogNamePrefix(fullyQualifiedDriverName))
+ writerOpts :=
[]rotatingFileWriterOption{WithLogNamePrefix(fullyQualifiedDriverName)}
+ if strings.TrimSpace(folderPath) != "" {
+ writerOpts = append(writerOpts,
WithTracingFolderPath(folderPath))
+ }
+ fileWriter, err := NewRotatingFileWriter(writerOpts...)
if err != nil {
return nil, err
}
diff --git a/go/adbc/driver/internal/driverbase/driver_test.go
b/go/adbc/driver/internal/driverbase/driver_test.go
index 1f1a24f5f..ab8efb4c8 100644
--- a/go/adbc/driver/internal/driverbase/driver_test.go
+++ b/go/adbc/driver/internal/driverbase/driver_test.go
@@ -449,7 +449,7 @@ func (drv *driverImpl) NewDatabase(opts map[string]string)
(adbc.Database, error
}
func (drv *driverImpl) NewDatabaseWithContext(ctx context.Context, opts
map[string]string) (adbc.Database, error) {
- dbBase, err := driverbase.NewDatabaseImplBase(ctx, &drv.DriverImplBase)
+ dbBase, err := driverbase.NewDatabaseImplBase(ctx, &drv.DriverImplBase,
driverbase.TracingOptions{})
if err != nil {
return nil, err
}
diff --git a/go/adbc/drivermgr/adbc_driver_manager.cc
b/go/adbc/drivermgr/adbc_driver_manager.cc
index be20a9e3e..9ac56c83d 100644
--- a/go/adbc/drivermgr/adbc_driver_manager.cc
+++ b/go/adbc/drivermgr/adbc_driver_manager.cc
@@ -16,11 +16,19 @@
// under the License.
#if defined(_WIN32)
-#include <windows.h> // Must come first
-
+// These version macros gate which Win32 APIs the SDK headers declare. They
MUST
+// be set before <windows.h> is included -- once windows.h pulls in winnt.h,
the
+// internal API-availability macros are fixed and later #defines have no
effect.
+// In particular, SHGetKnownFolderPath (used below) requires _WIN32_WINNT >=
0x0600
+// (Vista). Without this, builds with toolchains that default _WIN32_WINNT
below
+// Vista (e.g. TDM-GCC 10.x) fail with "SHGetKnownFolderPath was not declared".
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0A00 // Windows 10
+#endif
#ifndef NTDDI_VERSION
#define NTDDI_VERSION 0x0A00000C // For SHGetKnownFolderPath in ShlObj_core.h
in ShlObj.h
#endif
+#include <windows.h> // Must come first
#include <KnownFolders.h>
#include <ShlObj.h>
diff --git a/go/adbc/ext.go b/go/adbc/ext.go
index 4f0df0462..13fdfc686 100644
--- a/go/adbc/ext.go
+++ b/go/adbc/ext.go
@@ -38,13 +38,6 @@ type DatabaseLogging interface {
SetLogger(*slog.Logger)
}
-// OTelTracingInit is a Database that also supports OpenTelemetry tracing.
-//
-// EXPERIMENTAL. Not formally part of the ADBC APIs.
-type OTelTracingInit interface {
- InitTracing(ctx context.Context, driverName string, driverVersion
string) error
-}
-
// OTelTracing is an interface that supports instrumentation of
[OpenTelementry tracing].
//
// EXPERIMENTAL. Not formally part of the ADBC APIs.
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index b962ab0a0..e18f0cc09 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -62,6 +62,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 //
indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec //
indirect
+ github.com/stoewer/go-strcase v1.3.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
diff --git a/go/adbc/go.sum b/go/adbc/go.sum
index b09195c9e..47b62ad61 100644
--- a/go/adbc/go.sum
+++ b/go/adbc/go.sum
@@ -12,6 +12,8 @@ github.com/cenkalti/backoff/v5 v5.0.3
h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x
github.com/cenkalti/backoff/v5 v5.0.3/go.mod
h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1
h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
@@ -51,14 +53,23 @@ github.com/ncruces/go-strftime v1.0.0
h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOF
github.com/ncruces/go-strftime v1.0.0/go.mod
h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pierrec/lz4/v4 v4.1.26
h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY=
github.com/pierrec/lz4/v4 v4.1.26/go.mod
h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
+github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec
h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod
h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.14.1
h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod
h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
+github.com/stoewer/go-strcase v1.3.1
h1:iS0MdW+kVTxgMoE1LAZyMiYJFKlOzLooE4MxjirtkAs=
+github.com/stoewer/go-strcase v1.3.1/go.mod
h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
+github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod
h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod
h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
+github.com/stretchr/testify v1.7.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.1/go.mod
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1
h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod
h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
@@ -119,6 +130,7 @@ google.golang.org/protobuf v1.36.11/go.mod
h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod
h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY=